diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 5f51cf5..efcf2e1 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -400,16 +400,7 @@ public static void cleanDb(Configuration conf) throws Exception { stmt = conn.createStatement(); // We want to try these, whether they succeed or fail. - try { - stmt.execute("DROP INDEX HL_TXNID_INDEX"); - } catch (SQLException e) { - if(!("42X65".equals(e.getSQLState()) && 30000 == e.getErrorCode())) { - //42X65/3000 means index doesn't exist - LOG.error("Unable to drop index HL_TXNID_INDEX " + e.getMessage() + - "State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount); - success = false; - } - } + success &= dropIndex(stmt, "HL_TXNID_INDEX", retryCount); success &= dropTable(stmt, "TXN_COMPONENTS", retryCount); success &= dropTable(stmt, "COMPLETED_TXN_COMPONENTS", retryCount); @@ -443,6 +434,20 @@ public static void cleanDb(Configuration conf) throws Exception { throw new RuntimeException("Failed to clean up txn tables"); } + private static boolean dropIndex(Statement stmt, String index, int retryCount) { + try { + stmt.execute("DROP INDEX " + index); + } catch (SQLException e) { + if (!("42X65".equals(e.getSQLState()) && 30000 == e.getErrorCode())) { + //42X65/3000 means index doesn't exist + LOG.error("Unable to drop index {} {} State={} code={} retryCount={}", + index, e.getMessage(), e.getSQLState(), e.getErrorCode(), retryCount); + return false; + } + } + return true; + } + private static boolean dropTable(Statement stmt, String name, int retryCount) throws SQLException { for (int i = 0; i < 3; i++) { try { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 2995afa..707e981 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -41,8 +41,6 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -324,7 +322,6 @@ public void setConf(Configuration conf) { numOpenTxns = Metrics.getOrCreateGauge(MetricsConstants.NUM_OPEN_TXNS); timeout = MetastoreConf.getTimeVar(conf, ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS); - buildJumpTable(); retryInterval = MetastoreConf.getTimeVar(conf, ConfVars.HMS_HANDLER_INTERVAL, TimeUnit.MILLISECONDS); retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS); @@ -2608,7 +2605,7 @@ private static String normalizeCase(String s) { } private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId) - throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException { + throws NoSuchLockException, TxnAbortedException, MetaException { try { try { lockInternal(); @@ -2616,7 +2613,7 @@ private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long //should only get here if retrying this op dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); } - return checkLock(dbConn, extLockId); + return checkLock(dbConn, extLockId, txnId); } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); @@ -2676,7 +2673,7 @@ public LockResponse checkLock(CheckLockRequest rqst) //todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired //extra heartbeat is logically harmless, but ... - return checkLock(dbConn, extLockId); + return checkLock(dbConn, extLockId, info.txnId); } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); @@ -4310,8 +4307,8 @@ private static boolean isValidTxn(long txnId) { * checkLock() will in the worst case keep locks in Waiting state a little longer. */ @RetrySemantics.SafeToRetry("See @SafeToRetry") - private LockResponse checkLock(Connection dbConn, long extLockId) - throws NoSuchLockException, TxnAbortedException, MetaException, SQLException { + private LockResponse checkLock(Connection dbConn, long extLockId, long txnId) + throws NoSuchLockException, TxnAbortedException, MetaException, SQLException { Statement stmt = null; ResultSet rs = null; LockResponse response = new LockResponse(); @@ -4327,21 +4324,13 @@ private LockResponse checkLock(Connection dbConn, long extLockId) */ boolean isPartOfDynamicPartitionInsert = true; try { - List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now + List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId); //being acquired now response.setLockid(extLockId); - LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId)); - - StringBuilder query = new StringBuilder("SELECT \"HL_LOCK_EXT_ID\", " + - "\"HL_LOCK_INT_ID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", " + - "\"HL_LOCK_TYPE\", \"HL_TXNID\" FROM \"HIVE_LOCKS\" WHERE \"HL_DB\" IN ("); - - Set strings = new HashSet<>(locksBeingChecked.size()); //This the set of entities that the statement represented by extLockId wants to update List writeSet = new ArrayList<>(); for (LockInfo info : locksBeingChecked) { - strings.add(info.db); if(!isPartOfDynamicPartitionInsert && info.type == LockType.SHARED_WRITE) { writeSet.add(info); } @@ -4399,155 +4388,80 @@ private LockResponse checkLock(Connection dbConn, long extLockId) close(rs, stmt, null); } - boolean first = true; - for (String s : strings) { - if (first) first = false; - else query.append(", "); - query.append('\''); - query.append(s); - query.append('\''); - } - query.append(")"); - - // If any of the table requests are null, then I need to pull all the - // table locks for this db. - boolean sawNull = false; - strings.clear(); - for (LockInfo info : locksBeingChecked) { - if (info.table == null) { - sawNull = true; - break; - } else { - strings.add(info.table); - } - } - if (!sawNull) { - query.append(" AND (\"HL_TABLE\" IS NULL OR \"HL_TABLE\" IN("); - first = true; - for (String s : strings) { - if (first) first = false; - else query.append(", "); - query.append('\''); - query.append(s); - query.append('\''); - } - query.append("))"); - - // If any of the partition requests are null, then I need to pull all - // partition locks for this table. - sawNull = false; - strings.clear(); - for (LockInfo info : locksBeingChecked) { - if (info.partition == null) { - sawNull = true; - break; - } else { - strings.add(info.partition); - } - } - if (!sawNull) { - query.append(" AND (\"HL_PARTITION\" IS NULL OR \"HL_PARTITION\" IN("); - first = true; - for (String s : strings) { - if (first) first = false; - else query.append(", "); - query.append('\''); - query.append(s); - query.append('\''); - } - query.append("))"); - } - } - query.append(" AND \"HL_LOCK_EXT_ID\" < ").append(extLockId); + String queryStr = + " \"EX\".*, \"REQ\".\"HL_LOCK_INT_ID\" AS \"REQ_LOCK_INT_ID\" FROM (" + + " SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\"," + + " \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\" FROM \"HIVE_LOCKS\"" + + " WHERE \"HL_LOCK_EXT_ID\" < " + extLockId + ") \"EX\"" + + " INNER JOIN (" + + " SELECT \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\"," + + " \"HL_LOCK_TYPE\" FROM \"HIVE_LOCKS\"" + + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + ") \"REQ\"" + + " ON \"EX\".\"HL_DB\" = \"REQ\".\"HL_DB\"" + + " AND (\"EX\".\"HL_TABLE\" IS NULL OR \"REQ\".\"HL_TABLE\" IS NULL" + + " OR \"EX\".\"HL_TABLE\" = \"REQ\".\"HL_TABLE\"" + + " AND (\"EX\".\"HL_PARTITION\" IS NULL OR \"REQ\".\"HL_PARTITION\" IS NULL" + + " OR \"EX\".\"HL_PARTITION\" = \"REQ\".\"HL_PARTITION\"))" + + /*different locks from same txn should not conflict with each other, + txnId=0 means it's a select or IUD which does not write to ACID table*/ + " WHERE (\"REQ\".\"HL_TXNID\" = 0 OR \"EX\".\"HL_TXNID\" != \"REQ\".\"HL_TXNID\")" + + " AND "; + + /*EXCLUSIVE lock on partition should prevent SHARED_READ on the table, however there is no reason + for an EXCLUSIVE on a table to prevent SHARED_READ on a database. Similarly, EXCLUSIVE on a partition + should not conflict with SHARED_READ on a database. + SHARED_READ is usually acquired on a database to make sure it's not dropped, while some operation + is performed on that db (e.g. show tables, created table, etc). + EXCLUSIVE on an object may mean it's being dropped or overwritten.*/ + String[] whereStr = { + // exclusive + " \"REQ\".\"HL_LOCK_TYPE\"='e'" + + " AND NOT (\"EX\".\"HL_TABLE\" IS NULL AND \"EX\".\"HL_LOCK_TYPE\"='r' AND \"REQ\".\"HL_TABLE\" IS NOT NULL)", + // shared-write + " \"REQ\".\"HL_LOCK_TYPE\"='w' AND \"EX\".\"HL_LOCK_TYPE\" IN ('w','e')", + // shared-read + " \"REQ\".\"HL_LOCK_TYPE\"='r' AND \"EX\".\"HL_LOCK_TYPE\"='e'" + + " AND NOT (\"EX\".\"HL_TABLE\" IS NOT NULL AND \"REQ\".\"HL_TABLE\" IS NULL)", + }; + + List subQuery = new ArrayList<>(); + for (String subCond : whereStr) { + subQuery.add("(" + sqlGenerator.addLimitClause(1, queryStr + subCond) + ")"); + } + String query = String.join(" UNION ALL ", subQuery); - LOG.debug("Going to execute query <" + query.toString() + ">"); stmt = dbConn.createStatement(); - rs = stmt.executeQuery(query.toString()); - SortedSet lockSet = new TreeSet(new LockInfoComparator()); - while (rs.next()) { - lockSet.add(new LockInfo(rs)); - } - // Turn the tree set into an array so we can move back and forth easily - // in it. - LockInfo[] locks = lockSet.toArray(new LockInfo[lockSet.size()]); - if(LOG.isTraceEnabled()) { - LOG.trace("Locks to check(full): "); - for(LockInfo info : locks) { - LOG.trace(" " + info); - } - } + LOG.debug("Going to execute query <" + query + ">"); + rs = stmt.executeQuery(query); - for (LockInfo info : locksBeingChecked) { - // If we've found it and it's already been marked acquired, - // then just look at the other locks. - if (info.state == LockState.ACQUIRED) { - /**this is what makes this method @SafeToRetry*/ - continue; - } + if (rs.next()) { + // We acquire all locks for a given query atomically; if 1 blocks, all remain in Waiting state. + LockInfo blockedBy = new LockInfo(rs); + long intLockId = rs.getLong("REQ_LOCK_INT_ID"); - // Look at everything in front of this lock to see if it should block - // it or not. - for (int i = locks.length - 1; i >= 0; i--) { - // Check if we're operating on the same database, if not, move on - if (!info.db.equals(locks[i].db)) { - continue; - } + String sqlText = "UPDATE \"HIVE_LOCKS\"" + + " SET \"HL_BLOCKEDBY_EXT_ID\" = " + blockedBy.extLockId + + ", \"HL_BLOCKEDBY_INT_ID\" = " + blockedBy.intLockId + + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + " AND \"HL_LOCK_INT_ID\" = " + intLockId; - // If table is null on either of these, then they are claiming to - // lock the whole database and we need to check it. Otherwise, - // check if they are operating on the same table, if not, move on. - if (info.table != null && locks[i].table != null - && !info.table.equals(locks[i].table)) { - continue; - } - // if here, we may be checking a DB level lock against a Table level lock. Alternatively, - // we could have used Intention locks (for example a request for S lock on table would - // cause an IS lock DB that contains the table). Similarly, at partition level. - - // If partition is null on either of these, then they are claiming to - // lock the whole table and we need to check it. Otherwise, - // check if they are operating on the same partition, if not, move on. - if (info.partition != null && locks[i].partition != null - && !info.partition.equals(locks[i].partition)) { - continue; - } + LOG.debug("Executing sql: " + sqlText); + int updCnt = stmt.executeUpdate(sqlText); - // We've found something that matches what we're trying to lock, - // so figure out if we can lock it too. - LockAction lockAction = jumpTable.get(info.type).get(locks[i].type).get(locks[i].state); - LOG.debug("desired Lock: " + info + " checked Lock: " + locks[i] + " action: " + lockAction); - switch (lockAction) { - case WAIT: - if(!ignoreConflict(info, locks[i])) { - String sqlText = "UPDATE \"HIVE_LOCKS\"" + - " SET \"HL_BLOCKEDBY_EXT_ID\"=" + locks[i].extLockId + - ", \"HL_BLOCKEDBY_INT_ID\"=" + locks[i].intLockId + - " WHERE \"HL_LOCK_EXT_ID\"=" + info.extLockId + " AND \"HL_LOCK_INT_ID\"=" + info.intLockId; - LOG.debug("Executing sql: " + sqlText); - int updCnt = stmt.executeUpdate(sqlText); - if(updCnt != 1) { - shouldNeverHappen(info.txnId, info.extLockId, info.intLockId); - } - LOG.debug("Going to commit"); - dbConn.commit(); - response.setState(LockState.WAITING); - LOG.debug("Lock(" + info + ") waiting for Lock(" + locks[i] + ")"); - return response; - } - //fall through to ACQUIRE - case ACQUIRE: - break; - case KEEP_LOOKING: - continue; - } - //if we got here, it means it's ok to acquire 'info' lock - break;// so exit the loop and check next lock + if (updCnt != 1) { + shouldNeverHappen(txnId, extLockId, intLockId); } + LOG.debug("Going to commit"); + dbConn.commit(); + + LOG.debug("Lock({} intLockId:{} {}) is blocked by Lock({}})", JavaUtils.lockIdToString(extLockId), + intLockId, JavaUtils.txnIdToString(txnId), blockedBy); + response.setState(LockState.WAITING); + return response; } - //if here, ther were no locks that blocked any locks in 'locksBeingChecked' - acquire them all + // If here, there were no locks that would block any item from 'locksBeingChecked' - acquire them all acquire(dbConn, stmt, locksBeingChecked); - // We acquired all of the locks, so commit and return acquired. + // We acquired all the locks, so commit and return acquired. LOG.debug("Going to commit"); dbConn.commit(); response.setState(LockState.ACQUIRED); @@ -4556,6 +4470,7 @@ private LockResponse checkLock(Connection dbConn, long extLockId) } return response; } + private void acquire(Connection dbConn, Statement stmt, List locksBeingChecked) throws SQLException, NoSuchLockException, MetaException { if(locksBeingChecked == null || locksBeingChecked.isEmpty()) { @@ -5109,114 +5024,6 @@ private static synchronized DataSource setupJdbcConnectionPool(Configuration con } } - private static synchronized void buildJumpTable() { - if (jumpTable != null) return; - - jumpTable = new HashMap<>(3); - - // SR: Lock we are trying to acquire is shared read - Map> m = new HashMap<>(3); - jumpTable.put(LockType.SHARED_READ, m); - - // SR.SR: Lock we are examining is shared read - Map m2 = new HashMap<>(2); - m.put(LockType.SHARED_READ, m2); - - // SR.SR.acquired Lock we are examining is acquired; We can acquire - // because two shared reads can acquire together and there must be - // nothing in front of this one to prevent acquisition. - m2.put(LockState.ACQUIRED, LockAction.ACQUIRE); - - // SR.SR.wait Lock we are examining is waiting. In this case we keep - // looking, as it's possible that something in front is blocking it or - // that the other locker hasn't checked yet and he could lock as well. - m2.put(LockState.WAITING, LockAction.KEEP_LOOKING); - - // SR.SW: Lock we are examining is shared write - m2 = new HashMap<>(2); - m.put(LockType.SHARED_WRITE, m2); - - // SR.SW.acquired Lock we are examining is acquired; We can acquire - // because a read can share with a write, and there must be - // nothing in front of this one to prevent acquisition. - m2.put(LockState.ACQUIRED, LockAction.ACQUIRE); - - // SR.SW.wait Lock we are examining is waiting. In this case we keep - // looking, as it's possible that something in front is blocking it or - // that the other locker hasn't checked yet and he could lock as well or - // that something is blocking it that would not block a read. - m2.put(LockState.WAITING, LockAction.KEEP_LOOKING); - - // SR.E: Lock we are examining is exclusive - m2 = new HashMap<>(2); - m.put(LockType.EXCLUSIVE, m2); - - // No matter whether it has acquired or not, we cannot pass an exclusive. - m2.put(LockState.ACQUIRED, LockAction.WAIT); - m2.put(LockState.WAITING, LockAction.WAIT); - - // SW: Lock we are trying to acquire is shared write - m = new HashMap<>(3); - jumpTable.put(LockType.SHARED_WRITE, m); - - // SW.SR: Lock we are examining is shared read - m2 = new HashMap<>(2); - m.put(LockType.SHARED_READ, m2); - - // SW.SR.acquired Lock we are examining is acquired; We need to keep - // looking, because there may or may not be another shared write in front - // that would block us. - m2.put(LockState.ACQUIRED, LockAction.KEEP_LOOKING); - - // SW.SR.wait Lock we are examining is waiting. In this case we keep - // looking, as it's possible that something in front is blocking it or - // that the other locker hasn't checked yet and he could lock as well. - m2.put(LockState.WAITING, LockAction.KEEP_LOOKING); - - // SW.SW: Lock we are examining is shared write - m2 = new HashMap<>(2); - m.put(LockType.SHARED_WRITE, m2); - - // Regardless of acquired or waiting, one shared write cannot pass another. - m2.put(LockState.ACQUIRED, LockAction.WAIT); - m2.put(LockState.WAITING, LockAction.WAIT); - - // SW.E: Lock we are examining is exclusive - m2 = new HashMap<>(2); - m.put(LockType.EXCLUSIVE, m2); - - // No matter whether it has acquired or not, we cannot pass an exclusive. - m2.put(LockState.ACQUIRED, LockAction.WAIT); - m2.put(LockState.WAITING, LockAction.WAIT); - - // E: Lock we are trying to acquire is exclusive - m = new HashMap<>(3); - jumpTable.put(LockType.EXCLUSIVE, m); - - // E.SR: Lock we are examining is shared read - m2 = new HashMap<>(2); - m.put(LockType.SHARED_READ, m2); - - // Exclusives can never pass - m2.put(LockState.ACQUIRED, LockAction.WAIT); - m2.put(LockState.WAITING, LockAction.WAIT); - - // E.SW: Lock we are examining is shared write - m2 = new HashMap<>(2); - m.put(LockType.SHARED_WRITE, m2); - - // Exclusives can never pass - m2.put(LockState.ACQUIRED, LockAction.WAIT); - m2.put(LockState.WAITING, LockAction.WAIT); - - // E.E: Lock we are examining is exclusive - m2 = new HashMap<>(2); - m.put(LockType.EXCLUSIVE, m2); - - // No matter whether it has acquired or not, we cannot pass an exclusive. - m2.put(LockState.ACQUIRED, LockAction.WAIT); - m2.put(LockState.WAITING, LockAction.WAIT); - } /** * Returns true if {@code ex} should be retried */