diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7a8aa899c4..75c7c76d18 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2408,6 +2408,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "In nonstrict mode, for non-ACID resources, INSERT will only acquire shared lock, which\n" + "allows two concurrent writes to the same partition but still lets lock manager prevent\n" + "DROP TABLE etc. when the table is being written to"), + TXN_OVERWRITE_X_LOCK("hive.txn.xlock.iow", true, + "Ensures commands with OVERWRITE (such as INSERT OVERWRITE) acquire Exclusive locks for\b" + + "transactional tables. This ensures that inserts (w/o overwrite) running concurrently\n" + + "are not hidden by the INSERT OVERWRITE."), /** * @deprecated Use MetastoreConf.TXN_TIMEOUT */ diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java index c465adf1e2..f1cf113812 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java @@ -63,7 +63,7 @@ public WriteEntity() { public WriteEntity(Database database, WriteType type) { super(database, true); - writeType = type; + setWriteTypeInternal(type); } /** @@ -74,12 +74,12 @@ public WriteEntity(Database database, WriteType type) { */ public WriteEntity(Table t, WriteType type) { super(t, true); - writeType = type; + setWriteTypeInternal(type); } public WriteEntity(Table t, WriteType type, boolean complete) { super(t, complete); - writeType = type; + setWriteTypeInternal(type); } /** @@ -104,12 +104,12 @@ public WriteEntity(Database db, String objName, String className, Type type, Wri */ public WriteEntity(Partition p, WriteType type) { super(p, true); - writeType = type; + setWriteTypeInternal(type); } public WriteEntity(DummyPartition p, WriteType type, boolean complete) { super(p, complete); - writeType = type; + setWriteTypeInternal(type); } /** @@ -161,6 +161,9 @@ public WriteType getWriteType() { * @param type new operation type */ public void setWriteType(WriteType type) { + setWriteTypeInternal(type); + } + private void setWriteTypeInternal(WriteType type) { writeType = type; } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 76569d5ec3..4617b00064 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -510,7 +510,11 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi case INSERT_OVERWRITE: t = getTable(output); if (AcidUtils.isTransactionalTable(t)) { - compBuilder.setSemiShared(); + if(conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK)) { + compBuilder.setExclusive(); + } else { + compBuilder.setSemiShared(); + } compBuilder.setOperationType(DataOperationType.UPDATE); } else { compBuilder.setExclusive(); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 1dccf969ff..36f801fd55 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -14373,7 +14373,7 @@ private void addAlternateGByKeyMappings(ASTNode gByExpr, ColumnInfo colInfo, if (isNonNativeTable) { return WriteEntity.WriteType.INSERT_OVERWRITE; } else { - return ((ltd.getLoadFileType() == LoadFileType.REPLACE_ALL) + return ((ltd.getLoadFileType() == LoadFileType.REPLACE_ALL || ltd.isInsertOverwrite()) ? WriteEntity.WriteType.INSERT_OVERWRITE : getWriteType(dest)); } } diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 4c7ec16227..3cf264d532 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -30,9 +30,12 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksRequest; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.AcidWriteSetService; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Table; import org.junit.After; import org.junit.Assert; import org.apache.hadoop.hive.common.FileUtils; @@ -197,10 +200,20 @@ public void createTable() throws Exception { } @Test public void insertOverwriteCreate() throws Exception { + insertOverwriteCreate(false); + } + @Test + public void insertOverwriteCreateAcid() throws Exception { + insertOverwriteCreate(true); + } + private void insertOverwriteCreate(boolean isTransactional) throws Exception { + if(isTransactional) { + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + } dropTable(new String[] {"T2", "T3"}); CommandProcessorResponse cpr = driver.run("create table if not exists T2(a int)"); checkCmdOnDriver(cpr); - cpr = driver.run("create table if not exists T3(a int)"); + cpr = driver.run("create table if not exists T3(a int) stored as ORC"); checkCmdOnDriver(cpr); cpr = driver.compileAndRespond("insert overwrite table T3 select a from T2", true); checkCmdOnDriver(cpr); @@ -218,12 +231,24 @@ public void insertOverwriteCreate() throws Exception { } @Test public void insertOverwritePartitionedCreate() throws Exception { + insertOverwritePartitionedCreate(true); + } + @Test + public void insertOverwritePartitionedCreateAcid() throws Exception { + insertOverwritePartitionedCreate(false); + } + private void insertOverwritePartitionedCreate(boolean isTransactional) throws Exception { dropTable(new String[] {"T4"}); - CommandProcessorResponse cpr = driver.run("create table if not exists T4 (name string, gpa double) partitioned by (age int)"); + if(isTransactional) { + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID, true); + } + CommandProcessorResponse cpr = driver.run("create table if not exists T4" + + "(name string, gpa double) partitioned by (age int) stored as ORC"); checkCmdOnDriver(cpr); cpr = driver.run("create table if not exists T5(name string, age int, gpa double)"); checkCmdOnDriver(cpr); - cpr = driver.compileAndRespond("INSERT OVERWRITE TABLE T4 PARTITION (age) SELECT name, age, gpa FROM T5", true); + cpr = driver.compileAndRespond("INSERT OVERWRITE TABLE T4 PARTITION (age) SELECT " + + "name, age, gpa FROM T5", true); checkCmdOnDriver(cpr); txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); List locks = getLocks(); @@ -2454,7 +2479,25 @@ public void testAddPartitionLocks() throws Exception { Assert.assertEquals("Unexpected lock count", 1, locks.size()); checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T", null, locks); } + @Test + public void testLoadData() throws Exception { + dropTable(new String[] {"T2", "T3"}); + CommandProcessorResponse cpr = driver.run("create table if not exists T2(a int) " + + "stored as ORC TBLPROPERTIES ('transactional'='true')"); + checkCmdOnDriver(cpr); + checkCmdOnDriver(driver.run("insert into T2 values(1)")); + String exportLoc = exportFolder.newFolder("1").toString(); + checkCmdOnDriver(driver.run("export table T2 to '" + exportLoc + "/2'")); + cpr = driver.compileAndRespond( + "load data inpath '" + exportLoc + "/2/data' overwrite into table T2"); + checkCmdOnDriver(cpr); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); + List locks = getLocks(); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T2", null, locks); + txnMgr.commitTxn(); + } @Test public void testMmConversionLocks() throws Exception { dropTable(new String[] {"T"});