diff --git common/src/java/org/apache/hadoop/hive/common/JavaUtils.java common/src/java/org/apache/hadoop/hive/common/JavaUtils.java index a212fb8..3dd8f75 100644 --- common/src/java/org/apache/hadoop/hive/common/JavaUtils.java +++ common/src/java/org/apache/hadoop/hive/common/JavaUtils.java @@ -136,6 +136,14 @@ public static void closeClassLoader(ClassLoader loader) throws IOException { LogFactory.release(loader); } + /** + * Utility method for ACID to normalize logging info + * @param extLockId LockResponse.lockid + */ + public static String lockIdToString(long extLockId) { + return "lockid:" + extLockId; + } + private JavaUtils() { // prevent instantiation } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 1e64fc7..704c3ed 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -28,6 +28,7 @@ import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.conf.HiveConf; @@ -1115,6 +1116,8 @@ protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaE private static class LockInfo { private final long extLockId; private final long intLockId; + //0 means there is no transaction, i.e. it a select statement which is not part of + //explicit transaction or a IUD statement that is not writing to ACID table private final long txnId; private final String db; private final String table; @@ -1144,7 +1147,7 @@ protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaE default: throw new MetaException("Unknown lock type " + rs.getString("hl_lock_type").charAt(0)); } - txnId = rs.getLong("hl_txnid"); + txnId = rs.getLong("hl_txnid");//returns 0 if value is NULL } LockInfo(ShowLocksResponseElement e, long intLockId) { extLockId = e.getLockid(); @@ -1166,7 +1169,7 @@ public boolean equals(Object other) { @Override public String toString() { - return "extLockId:" + Long.toString(extLockId) + " intLockId:" + + return JavaUtils.lockIdToString(extLockId) + " intLockId:" + intLockId + " txnId:" + Long.toString (txnId) + " db:" + db + " table:" + table + " partition:" + partition + " state:" + (state == null ? "null" : state.toString()) @@ -1642,10 +1645,17 @@ private LockResponse checkLock(Connection dbConn, * on a database. */ private boolean ignoreConflict(LockInfo desiredLock, LockInfo existingLock) { - return (desiredLock.isDbLock() && desiredLock.type == LockType.SHARED_READ && - existingLock.isTableLock() && existingLock.type == LockType.EXCLUSIVE) || - (existingLock.isDbLock() && existingLock.type == LockType.SHARED_READ && - desiredLock.isTableLock() && desiredLock.type == LockType.EXCLUSIVE); + return + ((desiredLock.isDbLock() && desiredLock.type == LockType.SHARED_READ && + existingLock.isTableLock() && existingLock.type == LockType.EXCLUSIVE) || + (existingLock.isDbLock() && existingLock.type == LockType.SHARED_READ && + desiredLock.isTableLock() && desiredLock.type == LockType.EXCLUSIVE)) + || + //different locks from same txn should not conflict with each other + (desiredLock.txnId != 0 && desiredLock.txnId == existingLock.txnId) || + //txnId=0 means it's a select or IUD which does not write to ACID table, e.g + //insert overwrite table T partition(p=1) select a,b from T and autoCommit=true + (desiredLock.txnId == 0 && desiredLock.extLockId == existingLock.extLockId); } private void wait(Connection dbConn, Savepoint save) throws SQLException { diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 805e090..e8c49ef 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -19,6 +19,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.*; @@ -129,9 +130,9 @@ LockState checkLock(long extLockId) throws LockException { public void unlock(HiveLock hiveLock) throws LockException { long lockId = ((DbHiveLock)hiveLock).lockId; try { - LOG.debug("Unlocking id:" + lockId); + LOG.debug("Unlocking " + hiveLock); client.unlock(lockId); - boolean removed = locks.remove((DbHiveLock)hiveLock); + boolean removed = locks.remove(hiveLock); LOG.debug("Removed a lock " + removed); } catch (NoSuchLockException e) { LOG.error("Metastore could find no record of lock " + lockId); @@ -228,6 +229,10 @@ public boolean equals(Object other) { public int hashCode() { return (int)(lockId % Integer.MAX_VALUE); } + @Override + public String toString() { + return JavaUtils.lockIdToString(lockId); + } } /** diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index ac5ae2a..1431e19 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -44,7 +44,9 @@ private Driver d; private static enum Table { ACIDTBL("acidTbl"), - NONACIDORCTBL("nonAcidOrcTbl"); + ACIDTBLPART("acidTblPart"), + NONACIDORCTBL("nonAcidOrcTbl"), + NONACIDPART("nonAcidPart"); private final String name; @Override @@ -78,7 +80,9 @@ public void setUp() throws Exception { d = new Driver(hiveConf); dropTables(); runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')"); + runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')"); } private void dropTables() throws Exception { for(Table t : Table.values()) { @@ -138,6 +142,27 @@ public void testDeleteIn() throws Exception { Assert.assertEquals("Bulk update2 failed", stringifyValues(updatedData2), rs2); } + @Test + public void testInsertOverwriteWithSelfJoin() throws Exception { + int[][] part1Data = {{1,7}}; + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) " + makeValuesClause(part1Data)); + //this works because logically we need S lock on NONACIDORCTBL to read and X lock to write, but + //LockRequestBuilder dedups locks on the same entity to only keep the highest level lock requested + runStatementOnDriver("insert overwrite table " + Table.NONACIDORCTBL + " select 2, 9 from " + Table.NONACIDORCTBL + " T inner join " + Table.NONACIDORCTBL + " S on T.a=S.a"); + List rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL + " order by a,b"); + int[][] joinData = {{2,9}}; + Assert.assertEquals("Self join non-part insert overwrite failed", stringifyValues(joinData), rs); + int[][] part2Data = {{1,8}}; + runStatementOnDriver("insert into " + Table.NONACIDPART + " partition(p=1) (a,b) " + makeValuesClause(part1Data)); + runStatementOnDriver("insert into " + Table.NONACIDPART + " partition(p=2) (a,b) " + makeValuesClause(part2Data)); + //here we need X lock on p=1 partition to write and S lock on 'table' to read which should + //not block each other since they are part of the same txn + runStatementOnDriver("insert overwrite table " + Table.NONACIDPART + " partition(p=1) select a,b from " + Table.NONACIDPART); + List rs2 = runStatementOnDriver("select a,b from " + Table.NONACIDPART + " order by a,b"); + int[][] updatedData = {{1,7},{1,8},{1,8}}; + Assert.assertEquals("Insert overwrite partition failed", stringifyValues(updatedData), rs2); + //insert overwrite not supported for ACID tables + } /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order