diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java index b805904..9f52ebe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java @@ -53,7 +53,11 @@ // important because in that case we shouldn't acquire a lock for it or authorize the read. // These will be handled by the output to the table instead. private boolean isUpdateOrDelete = false; - //https://issues.apache.org/jira/browse/HIVE-15048 + /** + * https://issues.apache.org/jira/browse/HIVE-15048 + * It is possible that the same table is used in top level query and a sub-query, e.g. + * select * from T where T.c in (select c from T inner join S on T.a=S.b) + */ public transient boolean isFromTopLevelQuery = true; diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 027eb68..4a1cde7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -455,35 +455,8 @@ else if(deleting()) { useSuper = false; } - markReadEntityForUpdate(); + updateOutputs(mTable); - if (inputIsPartitioned(inputs)) { - //todo: there are bugs here: https://issues.apache.org/jira/browse/HIVE-15048 - // In order to avoid locking the entire write table we need to replace the single WriteEntity - // with a WriteEntity for each partition - assert outputs.size() == 1 : "expected 1 WriteEntity. Got " + outputs;//this asserts comment above - WriteEntity original = null; - for(WriteEntity we : outputs) { - original = we; - } - outputs.clear(); - for (ReadEntity input : inputs) { - /** - * The assumption here is that SemanticAnalyzer will will generate ReadEntity for each - * partition that exists and is matched by the WHERE clause (which may be all of them). - * Since we don't allow updating the value of a partition column, we know that we always - * write the same (or fewer) partitions than we read. Still, the write is a Dynamic - * Partition write - see HIVE-15032. - */ - if (input.getTyp() == Entity.Type.PARTITION) { - WriteEntity.WriteType writeType = deleting() ? WriteEntity.WriteType.DELETE : - WriteEntity.WriteType.UPDATE; - WriteEntity we = new WriteEntity(input.getPartition(), writeType); - we.setDynamicPartitionWrite(original.isDynamicPartitionWrite()); - outputs.add(we); - } - } - } if (updating()) { setUpAccessControlInfoForUpdate(mTable, setCols); @@ -517,17 +490,6 @@ private String operation() { return currentOperation.toString(); } - private boolean inputIsPartitioned(Set inputs) { - // We cannot simply look at the first entry, as in the case where the input is partitioned - // there will be a table entry as well. So look for at least one partition entry. - for (ReadEntity re : inputs) { - if (re.getTyp() == Entity.Type.PARTITION) { - return true; - } - } - return false; - } - // This method finds any columns on the right side of a set statement (thus rcols) and puts them // in a set so we can add them to the list of input cols to check. private void addSetRCols(ASTNode node, Set setRCols) { @@ -708,7 +670,17 @@ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) } finally { useSuper = false; } + updateOutputs(targetTable); + } + /** + * SemanticAnalyzer will generate a WriteEntity for the target table since it doesn't know/check + * if the read and write are of the same table in "insert ... select ....". Since DbTxnManager + * uses Read/WriteEntity objects to decide which locks to acquire, we get more concurrency if we + * have change the table WriteEntity to a set of partition WriteEntity objects based on + * ReadEntity objects computed for this table. + */ + private void updateOutputs(Table targetTable) { markReadEntityForUpdate(); if(targetTable.isPartitioned()) { @@ -721,6 +693,13 @@ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) WriteEntity.WriteType wt = we.getWriteType(); if(isTargetTable(we, targetTable) && (wt == WriteEntity.WriteType.UPDATE || wt == WriteEntity.WriteType.DELETE)) { + /** + * The assumption here is that SemanticAnalyzer will will generate ReadEntity for each + * partition that exists and is matched by the WHERE clause (which may be all of them). + * Since we don't allow updating the value of a partition column, we know that we always + * write the same (or fewer) partitions than we read. Still, the write is a Dynamic + * Partition write - see HIVE-15032. + */ toRemove.add(we); } } @@ -748,7 +727,8 @@ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) * be able to not use DP for the Insert... * * Note that the Insert of Merge may be creating new partitions and writing to partitions - * which were not read (WHEN NOT MATCHED...) + * which were not read (WHEN NOT MATCHED...). WriteEntity for that should be created + * in MoveTask (or some other task after the query is complete) */ private List getRestrictedPartitionSet(Table targetTable) { List partitionsRead = new ArrayList<>(); diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 637a01a..3c9358d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -2098,35 +2098,4 @@ private void testMergePartitioned(boolean causeConflict) throws Exception { TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid2)); } } - //https://issues.apache.org/jira/browse/HIVE-15048 - @Test - @Ignore("for some reason this fails with NPE in setUp() when run as part of the suite, but not standalone..") - public void testUpdateWithSubquery() throws Exception { - dropTable(new String[] {"target", "source"}); - checkCmdOnDriver(driver.run("create table target (a int, b int) " + - "partitioned by (p int, q int) clustered by (a) into 2 buckets " + - "stored as orc TBLPROPERTIES ('transactional'='true')")); - checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')")); - - checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)")); - - checkCmdOnDriver(driver.run( -"update target set b = 1 where p in (select t.q1 from source t where t.a1=5)")); -/** - * So the above query fails with invalid reference 'p' (in subquery) (as as if u use t.p) - * But before it fails, here is inputs/outpus before/after UpdateDeleteSemanticAnalyzer -* Before UDSA -* inputs: [default@target, default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2] -* outputs: [default@target] -* -* after UDSA -* inputs: [default@target, default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2] -* outputs: [default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2] -* -* So it looks like.... -*/ - checkCmdOnDriver(driver.run( - "update source set b1 = 1 where p1 in (select t.q from target t where t.p=2)")); - - } } diff --git ql/src/test/queries/clientpositive/acid_subquery.q ql/src/test/queries/clientpositive/acid_subquery.q new file mode 100644 index 0000000..ab87d4c --- /dev/null +++ ql/src/test/queries/clientpositive/acid_subquery.q @@ -0,0 +1,18 @@ +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +drop table if exists target; +drop table if exists source; + +create table target (a int, b int) partitioned by (p int, q int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true'); +create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true'); +insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2); + +-- the intent here is to record the set of ReadEntity and WriteEntity objects for these 2 update statements +update target set b = 1 where p in (select t.q1 from source t where t.a1=5); + +update source set b1 = 1 where p1 in (select t.q from target t where t.p=2); + +-- the extra predicates in when matched clause match 1 partition +merge into target t using source s on t.a = s.a1 when matched and p = 1 and q = 2 then update set b = 1 when matched and p = 2 and q = 2 then delete when not matched and a1 > 100 then insert values(s.a1,s.b1,s.p1, s.q1); \ No newline at end of file diff --git ql/src/test/results/clientpositive/acid_subquery.q.out ql/src/test/results/clientpositive/acid_subquery.q.out new file mode 100644 index 0000000..b977768 --- /dev/null +++ ql/src/test/results/clientpositive/acid_subquery.q.out @@ -0,0 +1,101 @@ +PREHOOK: query: drop table if exists target +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists target +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists source +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists source +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table target (a int, b int) partitioned by (p int, q int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@target +POSTHOOK: query: create table target (a int, b int) partitioned by (p int, q int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@target +PREHOOK: query: create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@source +POSTHOOK: query: create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@source +PREHOOK: query: insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2) +PREHOOK: type: QUERY +PREHOOK: Output: default@target +POSTHOOK: query: insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2) +POSTHOOK: type: QUERY +POSTHOOK: Output: default@target@p=1/q=2 +POSTHOOK: Output: default@target@p=1/q=3 +POSTHOOK: Output: default@target@p=2/q=2 +POSTHOOK: Lineage: target PARTITION(p=1,q=2).a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: target PARTITION(p=1,q=2).b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: target PARTITION(p=1,q=3).a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: target PARTITION(p=1,q=3).b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +POSTHOOK: Lineage: target PARTITION(p=2,q=2).a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ] +POSTHOOK: Lineage: target PARTITION(p=2,q=2).b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ] +PREHOOK: query: -- the intent here is to record the set of ReadEntity and WriteEntity objects for these 2 update statements +update target set b = 1 where p in (select t.q1 from source t where t.a1=5) +PREHOOK: type: QUERY +PREHOOK: Input: default@source +PREHOOK: Input: default@target +PREHOOK: Input: default@target@p=1/q=2 +PREHOOK: Input: default@target@p=1/q=3 +PREHOOK: Input: default@target@p=2/q=2 +PREHOOK: Output: default@target@p=1/q=2 +PREHOOK: Output: default@target@p=1/q=3 +PREHOOK: Output: default@target@p=2/q=2 +POSTHOOK: query: -- the intent here is to record the set of ReadEntity and WriteEntity objects for these 2 update statements +update target set b = 1 where p in (select t.q1 from source t where t.a1=5) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@source +POSTHOOK: Input: default@target +POSTHOOK: Input: default@target@p=1/q=2 +POSTHOOK: Input: default@target@p=1/q=3 +POSTHOOK: Input: default@target@p=2/q=2 +POSTHOOK: Output: default@target@p=1/q=2 +POSTHOOK: Output: default@target@p=1/q=3 +POSTHOOK: Output: default@target@p=2/q=2 +PREHOOK: query: update source set b1 = 1 where p1 in (select t.q from target t where t.p=2) +PREHOOK: type: QUERY +PREHOOK: Input: default@source +PREHOOK: Input: default@target +PREHOOK: Input: default@target@p=2/q=2 +PREHOOK: Output: default@source +POSTHOOK: query: update source set b1 = 1 where p1 in (select t.q from target t where t.p=2) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@source +POSTHOOK: Input: default@target +POSTHOOK: Input: default@target@p=2/q=2 +POSTHOOK: Output: default@source +PREHOOK: query: -- the extra predicates in when matched clause match 1 partition +merge into target t using source s on t.a = s.a1 when matched and p = 1 and q = 2 then update set b = 1 when matched and p = 2 and q = 2 then delete when not matched and a1 > 100 then insert values(s.a1,s.b1,s.p1, s.q1) +PREHOOK: type: QUERY +PREHOOK: Input: default@source +PREHOOK: Input: default@target +PREHOOK: Input: default@target@p=1/q=2 +PREHOOK: Input: default@target@p=1/q=3 +PREHOOK: Input: default@target@p=2/q=2 +PREHOOK: Output: default@target +PREHOOK: Output: default@target@p=1/q=2 +PREHOOK: Output: default@target@p=1/q=2 +PREHOOK: Output: default@target@p=1/q=3 +PREHOOK: Output: default@target@p=1/q=3 +PREHOOK: Output: default@target@p=2/q=2 +PREHOOK: Output: default@target@p=2/q=2 +POSTHOOK: query: -- the extra predicates in when matched clause match 1 partition +merge into target t using source s on t.a = s.a1 when matched and p = 1 and q = 2 then update set b = 1 when matched and p = 2 and q = 2 then delete when not matched and a1 > 100 then insert values(s.a1,s.b1,s.p1, s.q1) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@source +POSTHOOK: Input: default@target +POSTHOOK: Input: default@target@p=1/q=2 +POSTHOOK: Input: default@target@p=1/q=3 +POSTHOOK: Input: default@target@p=2/q=2 +POSTHOOK: Output: default@target@p=1/q=2 +POSTHOOK: Output: default@target@p=1/q=2 +POSTHOOK: Output: default@target@p=1/q=3 +POSTHOOK: Output: default@target@p=1/q=3 +POSTHOOK: Output: default@target@p=2/q=2 +POSTHOOK: Output: default@target@p=2/q=2