diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 01e5b82401..6f71b98504 100644 --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -56,8 +56,8 @@ import org.apache.hadoop.hive.metastore.api.TxnInfo; import org.apache.hadoop.hive.metastore.api.TxnOpenException; import org.apache.hadoop.hive.metastore.api.TxnState; -import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.hadoop.util.StringUtils; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; @@ -75,6 +75,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -957,6 +958,43 @@ public void testMultipleLock() throws Exception { assertEquals(0, txnHandler.numLocksInLockTable()); } + @Test + public void testBatching() throws Exception { + List components = new ArrayList(2); + for (int i = 0; i < 5000; i++) { + LockComponent comp = + new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb"); + comp.setTablename("mytable"); + comp.setPartitionname("mypartition_" + i); + comp.setOperationType(DataOperationType.NO_TXN); + components.add(comp); + } + + // set batch size to 1000 explicitly + TxnHandler handler = (TxnHandler) txnHandler; + int prevBatchSize = handler.getBatchSize(); + handler.setBatchSize(1000); + + LockRequest req = new LockRequest(components, "me", "localhost"); + LockResponse res = txnHandler.lock(req); + long lockid = res.getLockid(); + assertTrue(res.getState() == LockState.ACQUIRED); + + Connection dbConn = handler.getDbConn(Connection.TRANSACTION_READ_COMMITTED); + List locksBeingChecked = handler.getLockInfoFromLockId(dbConn,lockid); + assertEquals(5000, locksBeingChecked.size()); + List partsList = handler.getLocksBeingHeldForPartitions(locksBeingChecked); + assertEquals(5000, partsList.size()); + + res = txnHandler.checkLock(new CheckLockRequest(lockid)); + assertTrue(res.getState() == LockState.ACQUIRED); + txnHandler.unlock(new UnlockRequest(lockid)); + assertEquals(0, txnHandler.numLocksInLockTable()); + + //reset batch size. + handler.setBatchSize(prevBatchSize); + } + @Test public void testMultipleLockWait() throws Exception { // Test that two shared read locks can share a partition diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 64a542997b..d682da9c20 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -47,11 +47,13 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import javax.sql.DataSource; +import com.google.common.base.Throwables; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.classification.InterfaceAudience; @@ -64,6 +66,7 @@ import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.classification.RetrySemantics; +import org.apache.hadoop.hive.metastore.Batchable; import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier; @@ -186,6 +189,8 @@ private static DataSource connPoolMutex; static private boolean doRetryOnConnPool = false; + private int batchSize; + private List transactionalListeners; /** @@ -319,6 +324,12 @@ public void setConf(Configuration conf) { closeDbConn(dbConn); } } + this.batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_PARTITION_BATCH_SIZE); + if (this.batchSize == 0) { + // verify DB product + this.batchSize = DatabaseProduct.needsInBatching(dbProduct) ? 1000 : -1; + } + LOG.info("batchSize set to: {}", batchSize); } numOpenTxns = Metrics.getOrCreateGauge(MetricsConstants.NUM_OPEN_TXNS); @@ -342,6 +353,17 @@ public void setConf(Configuration conf) { } } + @VisibleForTesting + void setBatchSize(int size) { + LOG.info("Setting batch size to {}", size); + this.batchSize = size; + } + + @VisibleForTesting + int getBatchSize() { + return batchSize; + } + @Override public Configuration getConf() { return conf; @@ -4281,6 +4303,21 @@ private int abortTxns(Connection dbConn, List txnids, long max_heartbeat, private static boolean isValidTxn(long txnId) { return txnId != 0; } + + @VisibleForTesting + List getLocksBeingHeldForPartitions(List locksBeingChecked) { + Set lockStrings = new HashSet<>(); + for (LockInfo info : locksBeingChecked) { + if (info.partition == null) { + lockStrings.clear(); + break; + } else { + lockStrings.add(info.partition); + } + } + return new ArrayList<>(lockStrings); + } + /** * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller * hl_lock_ext_id by only checking earlier locks. @@ -4296,11 +4333,158 @@ private static boolean isValidTxn(long txnId) { */ @RetrySemantics.SafeToRetry("See @SafeToRetry") private LockResponse checkLock(Connection dbConn, long extLockId) - throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { + throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { + + LockResponse response = new LockResponse(); TxnStore.MutexAPI.LockHandle handle = null; Statement stmt = null; + + List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now + + // get all partitions needed for batching + List locksForPartitions = getLocksBeingHeldForPartitions(locksBeingChecked); + + try { + /** + * checkLock() must be mutex'd against any other checkLock to make sure 2 conflicting locks + * are not granted by parallel checkLock() calls. + */ + handle = getMutexAPI().acquireLock(MUTEX_KEY.CheckLock.name()); + LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId)); + Savepoint save = dbConn.setSavepoint(); + response.setLockid(extLockId); + + stmt = dbConn.createStatement(); + SortedSet lockSet = new TreeSet<>(new LockInfoComparator()); + final AtomicReference stmtRef = new AtomicReference<>(stmt); + + // do batching here. + Batchable batch = new Batchable() { + + @Override public List run(List input) + throws MetaException { + Set lockInfoSet = new HashSet<>(input); + List lockInfo = new ArrayList<>(input.size()); + try { + lockInfo.addAll( + checkLock(dbConn, extLockId, lockInfoSet, + stmtRef.get(), locksBeingChecked)); + } catch (TxnAbortedException | SQLException e) { + LOG.info("Error in executing batch for inputs: " + input, e); + throw new MetaException(Throwables.getStackTraceAsString(e)); + } + return lockInfo; + } + }; + + // Batching here for partitions + lockSet.addAll(Batchable.runBatched(batchSize, locksForPartitions, + batch)); + + // Turn the tree set into an array so we can move back and forth easily + // in it. + LockInfo[] locks = lockSet.toArray(new LockInfo[lockSet.size()]); + if(LOG.isTraceEnabled()) { + LOG.trace("Locks to check(full): "); + for(LockInfo info : locks) { + LOG.trace(" " + info); + } + } + + for (LockInfo info : locksBeingChecked) { + // If we've found it and it's already been marked acquired, + // then just look at the other locks. + if (info.state == LockState.ACQUIRED) { + /**this is what makes this method @SafeToRetry*/ + continue; + } + + // Look at everything in front of this lock to see if it should block + // it or not. + for (int i = locks.length - 1; i >= 0; i--) { + // Check if we're operating on the same database, if not, move on + if (!info.db.equals(locks[i].db)) { + continue; + } + + // If table is null on either of these, then they are claiming to + // lock the whole database and we need to check it. Otherwise, + // check if they are operating on the same table, if not, move on. + if (info.table != null && locks[i].table != null + && !info.table.equals(locks[i].table)) { + continue; + } + // if here, we may be checking a DB level lock against a Table level lock. Alternatively, + // we could have used Intention locks (for example a request for S lock on table would + // cause an IS lock DB that contains the table). Similarly, at partition level. + + // If partition is null on either of these, then they are claiming to + // lock the whole table and we need to check it. Otherwise, + // check if they are operating on the same partition, if not, move on. + if (info.partition != null && locks[i].partition != null + && !info.partition.equals(locks[i].partition)) { + continue; + } + + // We've found something that matches what we're trying to lock, + // so figure out if we can lock it too. + LockAction lockAction = jumpTable.get(info.type).get(locks[i].type).get(locks[i].state); + LOG.debug("desired Lock: " + info + " checked Lock: " + locks[i] + " action: " + lockAction); + switch (lockAction) { + case WAIT: + if(!ignoreConflict(info, locks[i])) { + /*we acquire all locks for a given query atomically; if 1 blocks, all go into (remain) in + * Waiting state. wait() will undo any 'acquire()' which may have happened as part of + * this (metastore db) transaction and then we record which lock blocked the lock + * we were testing ('info').*/ + wait(dbConn, save); + String sqlText = "update HIVE_LOCKS" + + " set HL_BLOCKEDBY_EXT_ID=" + locks[i].extLockId + + ", HL_BLOCKEDBY_INT_ID=" + locks[i].intLockId + + " where HL_LOCK_EXT_ID=" + info.extLockId + " and HL_LOCK_INT_ID=" + info.intLockId; + LOG.debug("Executing sql: " + sqlText); + int updCnt = stmt.executeUpdate(sqlText); + if(updCnt != 1) { + shouldNeverHappen(info.txnId, info.extLockId, info.intLockId); + } + LOG.debug("Going to commit"); + dbConn.commit(); + response.setState(LockState.WAITING); + LOG.debug("Lock(" + info + ") waiting for Lock(" + locks[i] + ")"); + return response; + } + //fall through to ACQUIRE + case ACQUIRE: + break; + case KEEP_LOOKING: + continue; + } + //if we got here, it means it's ok to acquire 'info' lock + break;// so exit the loop and check next lock + } + } + //if here, ther were no locks that blocked any locks in 'locksBeingChecked' - acquire them all + acquire(dbConn, stmt, locksBeingChecked); + + // We acquired all of the locks, so commit and return acquired. + LOG.debug("Going to commit"); + dbConn.commit(); + response.setState(LockState.ACQUIRED); + } finally { + close(null, stmt, dbConn); + if(handle != null) { + handle.releaseLocks(); + } + } + return response; + + } + + private Set checkLock(Connection dbConn, long extLockId, + Set locksForPartitions, Statement stmt, + List locksBeingChecked) throws TxnAbortedException, MetaException, SQLException { ResultSet rs = null; - LockResponse response = new LockResponse(); + SortedSet lockSet = new TreeSet(new LockInfoComparator()); /** * todo: Longer term we should pass this from client somehow - this would be an optimization; once * that is in place make sure to build and test "writeSet" below using OperationType not LockType @@ -4313,16 +4497,6 @@ private LockResponse checkLock(Connection dbConn, long extLockId) */ boolean isPartOfDynamicPartitionInsert = true; try { - /** - * checkLock() must be mutex'd against any other checkLock to make sure 2 conflicting locks - * are not granted by parallel checkLock() calls. - */ - handle = getMutexAPI().acquireLock(MUTEX_KEY.CheckLock.name()); - List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now - response.setLockid(extLockId); - - LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId)); - Savepoint save = dbConn.setSavepoint(); StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in ("); @@ -4343,7 +4517,6 @@ private LockResponse checkLock(Connection dbConn, long extLockId) //Write operation always start a txn throw new IllegalStateException("Found Write lock for " + JavaUtils.lockIdToString(extLockId) + " but no txnid"); } - stmt = dbConn.createStatement(); StringBuilder sb = new StringBuilder(" ws_database, ws_table, ws_partition, " + "ws_txnid, ws_commit_id " + "from WRITE_SET where ws_commit_id >= " + writeSet.get(0).txnId + " and (");//see commitTxn() for more info on this inequality @@ -4425,24 +4598,15 @@ private LockResponse checkLock(Connection dbConn, long extLockId) } query.append("))"); - // If any of the partition requests are null, then I need to pull all - // partition locks for this table. - sawNull = false; - strings.clear(); - for (LockInfo info : locksBeingChecked) { - if (info.partition == null) { - sawNull = true; - break; - } else { - strings.add(info.partition); - } - } - if (!sawNull) { + if (!locksForPartitions.isEmpty()) { query.append(" and (hl_partition is null or hl_partition in("); first = true; - for (String s : strings) { - if (first) first = false; - else query.append(", "); + for (String s : locksForPartitions) { + if (first) { + first = false; + } else { + query.append(", "); + } query.append('\''); query.append(s); query.append('\''); @@ -4455,106 +4619,13 @@ private LockResponse checkLock(Connection dbConn, long extLockId) LOG.debug("Going to execute query <" + query.toString() + ">"); stmt = dbConn.createStatement(); rs = stmt.executeQuery(query.toString()); - SortedSet lockSet = new TreeSet(new LockInfoComparator()); while (rs.next()) { lockSet.add(new LockInfo(rs)); } - // Turn the tree set into an array so we can move back and forth easily - // in it. - LockInfo[] locks = lockSet.toArray(new LockInfo[lockSet.size()]); - if(LOG.isTraceEnabled()) { - LOG.trace("Locks to check(full): "); - for(LockInfo info : locks) { - LOG.trace(" " + info); - } - } - - for (LockInfo info : locksBeingChecked) { - // If we've found it and it's already been marked acquired, - // then just look at the other locks. - if (info.state == LockState.ACQUIRED) { - /**this is what makes this method @SafeToRetry*/ - continue; - } - - // Look at everything in front of this lock to see if it should block - // it or not. - for (int i = locks.length - 1; i >= 0; i--) { - // Check if we're operating on the same database, if not, move on - if (!info.db.equals(locks[i].db)) { - continue; - } - - // If table is null on either of these, then they are claiming to - // lock the whole database and we need to check it. Otherwise, - // check if they are operating on the same table, if not, move on. - if (info.table != null && locks[i].table != null - && !info.table.equals(locks[i].table)) { - continue; - } - // if here, we may be checking a DB level lock against a Table level lock. Alternatively, - // we could have used Intention locks (for example a request for S lock on table would - // cause an IS lock DB that contains the table). Similarly, at partition level. - - // If partition is null on either of these, then they are claiming to - // lock the whole table and we need to check it. Otherwise, - // check if they are operating on the same partition, if not, move on. - if (info.partition != null && locks[i].partition != null - && !info.partition.equals(locks[i].partition)) { - continue; - } - - // We've found something that matches what we're trying to lock, - // so figure out if we can lock it too. - LockAction lockAction = jumpTable.get(info.type).get(locks[i].type).get(locks[i].state); - LOG.debug("desired Lock: " + info + " checked Lock: " + locks[i] + " action: " + lockAction); - switch (lockAction) { - case WAIT: - if(!ignoreConflict(info, locks[i])) { - /*we acquire all locks for a given query atomically; if 1 blocks, all go into (remain) in - * Waiting state. wait() will undo any 'acquire()' which may have happened as part of - * this (metastore db) transaction and then we record which lock blocked the lock - * we were testing ('info').*/ - wait(dbConn, save); - String sqlText = "update HIVE_LOCKS" + - " set HL_BLOCKEDBY_EXT_ID=" + locks[i].extLockId + - ", HL_BLOCKEDBY_INT_ID=" + locks[i].intLockId + - " where HL_LOCK_EXT_ID=" + info.extLockId + " and HL_LOCK_INT_ID=" + info.intLockId; - LOG.debug("Executing sql: " + sqlText); - int updCnt = stmt.executeUpdate(sqlText); - if(updCnt != 1) { - shouldNeverHappen(info.txnId, info.extLockId, info.intLockId); - } - LOG.debug("Going to commit"); - dbConn.commit(); - response.setState(LockState.WAITING); - LOG.debug("Lock(" + info + ") waiting for Lock(" + locks[i] + ")"); - return response; - } - //fall through to ACQUIRE - case ACQUIRE: - break; - case KEEP_LOOKING: - continue; - } - //if we got here, it means it's ok to acquire 'info' lock - break;// so exit the loop and check next lock - } - } - //if here, ther were no locks that blocked any locks in 'locksBeingChecked' - acquire them all - acquire(dbConn, stmt, locksBeingChecked); - - // We acquired all of the locks, so commit and return acquired. - LOG.debug("Going to commit"); - dbConn.commit(); - response.setState(LockState.ACQUIRED); } finally { - close(rs, stmt, null); - if(handle != null) { - handle.releaseLocks(); - } + close(rs, null, null); } - return response; + return lockSet; } private void acquire(Connection dbConn, Statement stmt, List locksBeingChecked) throws SQLException, NoSuchLockException, MetaException { @@ -4897,7 +4968,8 @@ private LockInfo getTxnIdFromLockId(Connection dbConn, long extLockId) } // NEVER call this function without first calling heartbeat(long, long) - private List getLockInfoFromLockId(Connection dbConn, long extLockId) + @VisibleForTesting + List getLockInfoFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException { Statement stmt = null; try {