diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 70cbab7..da2b395 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -824,7 +824,7 @@ public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException { * but what abount markCleaned() which is called when table is had been deleted... */ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw - //todo: this shoudl take "comment" as parameter to set in CC_META_INFO to provide some context for the failure + //todo: this should take "comment" as parameter to set in CC_META_INFO to provide some context for the failure try { Connection dbConn = null; Statement stmt = null; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 79c4f7a..4906980 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -45,6 +45,8 @@ import java.sql.*; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; /** * A handler to answer transaction related calls that come into the metastore @@ -60,6 +62,28 @@ * For locks that are part of transaction, we set this 0 (would rather set it to NULL but * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding * transaction in TXNS. + * + * In general there can be multiple metastores where this logic can execute, thus the DB is + * used to ensure proper mutexing of operations. + * Select ... For Update (or equivalent: either MsSql with(updlock) or actual Update stmt) is + * used to properly sequence operations. Most notably: + * 1. various sequence IDs are generated with aid of this mutex + * 2. ensuring that each (Hive) Transaction state is transitioned atomically. Transaction state + * includes it's actual state (Open, Aborted) as well as it's lock list/component list. Thus all + * per transaction ops, either start by update/delete of the relevant TXNS row or do S4U on that row. + * This allows almost all operations to run at READ_COMMITTED and minimizes DB deadlocks. + * 3. checkLock() - this is mutexted entirely since we must ensure that while we check if some lock + * can be granted, no other (strictly speaking "earlier") lock can change state. + * + * The exception to his is Derby which doesn't support proper S4U. Derby is always running embedded + * (this is the only supported configuration for Derby) + * in the same JVM as HiveMetaStoreHandler thus we use JVM wide lock to properly sequnce the operations. + * + * {@link #derbyLock} + + * If we ever decide to run remote Derby server, according to + * https://db.apache.org/derby/docs/10.0/manuals/develop/develop78.html all transactions will be + * seriazlied, so that would also work though has not been tested. */ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -112,11 +136,11 @@ private long retryInterval; private int retryLimit; private int retryNum; + /** + * Derby specific concurrency control + */ + private static final ReentrantLock derbyLock = new ReentrantLock(true); - // DEADLOCK DETECTION AND HANDLING - // A note to developers of this class. ALWAYS access HIVE_LOCKS before TXNS to avoid deadlock - // between simultaneous accesses. ALWAYS access TXN_COMPONENTS before HIVE_LOCKS . - // // Private methods should never catch SQLException and then throw MetaException. The public // methods depend on SQLException coming back so they can detect and handle deadlocks. Private // methods should only throw MetaException when they explicitly know there's a logic error and @@ -130,19 +154,29 @@ public TxnHandler() { } + /** + * This is logically part of c'tor and must be called prior to any other method. + * Not physically part of c'tor due to use of relfection + */ public void setConf(HiveConf conf) { this.conf = conf; checkQFileTestHack(); + Connection dbConn = null; // Set up the JDBC connection pool try { setupJdbcConnectionPool(conf); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + determineDatabaseProduct(dbConn); } catch (SQLException e) { String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage(); LOG.error(msg); throw new RuntimeException(e); } + finally { + closeDbConn(dbConn); + } timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS); buildJumpTable(); @@ -276,29 +310,6 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { return getOpenTxns(); } } - - /** - * Transform a {@link org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a - * {@link org.apache.hadoop.hive.common.ValidTxnList}. This assumes that the caller intends to - * read the files, and thus treats both open and aborted transactions as invalid. - * @param txns txn list from the metastore - * @param currentTxn Current transaction that the user has open. If this is greater than 0 it - * will be removed from the exceptions list so that the user sees his own - * transaction as valid. - * @return a valid txn list. - */ - public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) { - long highWater = txns.getTxn_high_water_mark(); - Set open = txns.getOpen_txns(); - long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)]; - int i = 0; - for(long txn: open) { - if (currentTxn > 0 && currentTxn == txn) continue; - exceptions[i++] = txn; - } - return new ValidReadTxnList(exceptions, highWater); - } - public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { int numTxns = rqst.getNum_txns(); try { @@ -306,11 +317,12 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { Statement stmt = null; ResultSet rs = null; try { + lock(); /** * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic. - * Also, advancing the counter must work when multiple metastores are running, thus either - * SELECT ... FOR UPDATE is used or SERIALIZABLE isolation. The former is preferred since it prevents + * Also, advancing the counter must work when multiple metastores are running. + * SELECT ... FOR UPDATE is used to prevent * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID. * * In the current design, there can be several metastore instances running in a given Warehouse. @@ -323,14 +335,14 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations). This * set could support a write-through cache for added performance. */ - dbConn = getDbConn(getRequiredIsolationLevel()); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); // Make sure the user has not requested an insane amount of txns. int maxTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); if (numTxns > maxTxns) numTxns = maxTxns; stmt = dbConn.createStatement(); - String s = addForUpdateClause(dbConn, "select ntxn_next from NEXT_TXN_ID"); + String s = addForUpdateClause("select ntxn_next from NEXT_TXN_ID"); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { @@ -366,6 +378,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { + StringUtils.stringifyException(e)); } finally { close(rs, stmt, dbConn); + unlock(); } } catch (RetryException e) { return openTxns(rqst); @@ -377,7 +390,8 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept try { Connection dbConn = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + lock(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); if (abortTxns(dbConn, Collections.singletonList(txnid)) != 1) { LOG.debug("Going to rollback"); dbConn.rollback(); @@ -394,6 +408,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); + unlock(); } } catch (RetryException e) { abortTxn(rqst); @@ -406,33 +421,24 @@ public void commitTxn(CommitTxnRequest rqst) try { Connection dbConn = null; Statement stmt = null; + ResultSet lockHandle = null; try { + lock(); /** - * This has to run at SERIALIZABLE to make no concurrent attempt to acquire locks (insert into HIVE_LOCKS) - * can happen. Otherwise we may end up with orphaned locks. While lock() and commitTxn() should not - * normally run concurrently (for same txn) but could due to bugs in the client which could then - * (w/o SERIALIZABLE) corrupt internal transaction manager state. Also competes with abortTxn() - * - * Sketch of an improvement: - * Introduce a new transaction state in TXNS, state 'c'. This is a transient Committed state. - * commitTxn() would mark the txn 'c' in TXNS in an independent txn. Other operation like - * lock(), heartbeat(), etc would raise errors for txn in 'c' state and getOpenTxns(), etc would - * treat 'c' txn as 'open'. Then this method could run in READ COMMITTED since the - * entry for this txn in TXNS in 'c' acts like a monitor. - * Since the move to 'c' state is in one txn (to make it visible) and the rest of the - * operations in another (could even be made separate txns), there is a possibility of failure - * between the 2. Thus the AcidHouseKeeper logic to timeout txns should apply 'c' state txns. - * - * Or perhaps Select * TXNS where txn_id = " + txnid; for update + * Runs at READ_COMMITTED with S4U on TXNS row for "txnid". S4U ensures that no other + * operation can change this txn (such acquiring locks). While lock() and commitTxn() + * should not normally run concurrently (for same txn) but could due to bugs in the client + * which could then corrupt internal transaction manager state. Also competes with abortTxn(). */ - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - // Before we do the commit heartbeat the txn. This is slightly odd in that we're going to - // commit it, but it does two things. One, it makes sure the transaction is still valid. - // Two, it avoids the race condition where we time out between now and when we actually - // commit the transaction below. And it does this all in a dead-lock safe way by - // committing the heartbeat back to the database. - heartbeatTxn(dbConn, txnid); + + lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if(lockHandle == null) { + //this also ensures that txn is still there and in expected state (hasn't been timed out) + ensureValidTxn(dbConn, txnid, stmt); + shouldNeverHappen(txnid); + } // Move the record from txn_components into completed_txn_components so that the compactor // knows where to look to compact. @@ -465,35 +471,213 @@ public void commitTxn(CommitTxnRequest rqst) throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - closeStmt(stmt); - closeDbConn(dbConn); + close(lockHandle, stmt, dbConn); + unlock(); } } catch (RetryException e) { commitTxn(rqst); } } - public LockResponse lock(LockRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException { + /** + * As much as possible (i.e. in absence of retries) we want both operations to be done on the same + * connection (but separate transactions). This avoid some flakiness in BONECP where if you + * perform an operation on 1 connection and immediately get another fron the pool, the 2nd one + * doesn't see results of the first. + */ + public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { + ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst); + try { + return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid()); + } + catch(NoSuchLockException e) { + // This should never happen, as we just added the lock id + throw new MetaException("Couldn't find a lock we just created! " + e.getMessage()); + } + } + private static final class ConnectionLockIdPair { + private final Connection dbConn; + private final long extLockId; + private ConnectionLockIdPair(Connection dbConn, long extLockId) { + this.dbConn = dbConn; + this.extLockId = extLockId; + } + } + + /** + * Note that by definition select for update is divorced from update, i.e. you executeQuery() to read + * and then executeUpdate(). One other alternative would be to actually update the row in TXNX but + * to the same value as before thus forcing db to acquire write lock for duration of the transaction. + * + * There is no real reason to return the ResultSet here other than to make sure the reference to it + * is retained for duration of intended lock scope and is not GC'd thus (unlikely) causing lock + * to be released. + * @param txnState the state this txn is expected to be in. may be null + * @return null if no row was found + * @throws SQLException + * @throws MetaException + */ + private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException { + String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? "AND TXN_STATE=" + quoteChar(txnState) : ""); + ResultSet rs = stmt.executeQuery(addForUpdateClause(query)); + if(rs.next()) { + return rs; + } + close(rs); + return null; + } + + /** + * This enters locks into the queue in {@link #LOCK_WAITING} mode. + * + * Isolation Level Notes: + * 1. We use S4U (withe read_committed) to generate the next (ext) lock id. This serializes + * any 2 {@code enqueueLockWithRetry()} calls. + * 2. We use S4U on the relevant TXNS row to block any concurrent abort/commit/etc operations + * @see #checkLockWithRetry(Connection, long, long) + */ + private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { + boolean success = false; + Connection dbConn = null; + try { + Statement stmt = null; + ResultSet rs = null; + ResultSet lockHandle = null; + try { + lock(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + long txnid = rqst.getTxnid(); + stmt = dbConn.createStatement(); + if (isValidTxn(txnid)) { + //this also ensures that txn is still there in expected state + lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN); + if(lockHandle == null) { + ensureValidTxn(dbConn, txnid, stmt); + shouldNeverHappen(txnid); + } + } + /** Get the next lock id. + * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. + * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7, + * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and aquires the locks. Then 7 unblocks, + * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} + * doesn't block on locks acquired later than one it's checking*/ + String s = addForUpdateClause("select nl_next from NEXT_LOCK_ID"); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_lock_id"); + } + long extLockId = rs.getLong(1); + s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + + if (txnid > 0) { + // For each component in this lock request, + // add an entry to the txn_components table + // This must be done before HIVE_LOCKS is accessed + for (LockComponent lc : rqst.getComponent()) { + String dbName = lc.getDbname(); + String tblName = lc.getTablename(); + String partName = lc.getPartitionname(); + s = "insert into TXN_COMPONENTS " + + "(tc_txnid, tc_database, tc_table, tc_partition) " + + "values (" + txnid + ", '" + dbName + "', " + + (tblName == null ? "null" : "'" + tblName + "'") + ", " + + (partName == null ? "null" : "'" + partName + "'") + ")"; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + } + } + + long intLockId = 0; + for (LockComponent lc : rqst.getComponent()) { + intLockId++; + String dbName = lc.getDbname(); + String tblName = lc.getTablename(); + String partName = lc.getPartitionname(); + LockType lockType = lc.getType(); + char lockChar = 'z'; + switch (lockType) { + case EXCLUSIVE: + lockChar = LOCK_EXCLUSIVE; + break; + case SHARED_READ: + lockChar = LOCK_SHARED; + break; + case SHARED_WRITE: + lockChar = LOCK_SEMI_SHARED; + break; + } + long now = getDbTime(dbConn); + s = "insert into HIVE_LOCKS " + + " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + + "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + + " values (" + extLockId + ", " + + +intLockId + "," + txnid + ", '" + + dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'") + + ", " + (partName == null ? "null" : "'" + partName + "'") + + ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + + //for locks associated with a txn, we always heartbeat txn and timeout based on that + (isValidTxn(txnid) ? 0 : now) + ", '" + + rqst.getUser() + "', '" + rqst.getHostname() + "')"; + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + } + dbConn.commit(); + success = true; + return new ConnectionLockIdPair(dbConn, extLockId); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "enqueueLockWithRetry(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(lockHandle); + close(rs, stmt, null); + if (!success) { + /* This needs to return a "live" connection to be used by operation that follows it. + Thus it only closes Connection on failure/retry. */ + closeDbConn(dbConn); + } + unlock(); + } + } + catch(RetryException e) { + return enqueueLockWithRetry(rqst); + } + } + private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId) + throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException { try { - Connection dbConn = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); - return lock(dbConn, rqst); + lock(); + if(dbConn.isClosed()) { + //should only get here if retrying this op + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + } + dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); + return checkLock(dbConn, extLockId); } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "lock(" + rqst + ")"); + checkRetryable(dbConn, e, "checkLockWithRetry(" + extLockId + "," + txnId + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { + unlock(); closeDbConn(dbConn); } - } catch (RetryException e) { - return lock(rqst); + } + catch(RetryException e) { + return checkLockWithRetry(dbConn, extLockId, txnId); } } - /** * Why doesn't this get a txnid as parameter? The caller should either know the txnid or know there isn't one. * Either way getTxnIdFromLockId() will not be needed. This would be a Thrift change. @@ -515,6 +699,7 @@ public LockResponse checkLock(CheckLockRequest rqst) Connection dbConn = null; long extLockId = rqst.getLockid(); try { + lock(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); // Heartbeat on the lockid first, to assure that our lock is still valid. // Then look up the lock info (hopefully in the cache). If these locks @@ -529,6 +714,9 @@ public LockResponse checkLock(CheckLockRequest rqst) else { heartbeatLock(dbConn, extLockId); } + //todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and + //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired + //extra heartbeat is logically harmless, but ... dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); return checkLock(dbConn, extLockId); } catch (SQLException e) { @@ -539,6 +727,7 @@ public LockResponse checkLock(CheckLockRequest rqst) JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); + unlock(); } } catch (RetryException e) { return checkLock(rqst); @@ -549,6 +738,9 @@ public LockResponse checkLock(CheckLockRequest rqst) /** * This would have been made simpler if all locks were associated with a txn. Then only txn needs to * be heartbeated, committed, etc. no need for client to track individual locks. + * When removing locks not associated with txn this potentially conflicts with + * heartbeat/performTimeout which are update/delete of HIVE_LOCKS thus will be locked as needed by db. + * since this only removes from HIVE_LOCKS at worst some lock acquire is delayed */ public void unlock(UnlockRequest rqst) throws NoSuchLockException, TxnOpenException, MetaException { @@ -573,6 +765,8 @@ public void unlock(UnlockRequest rqst) //hl_txnid <> 0 means it's associated with a transaction String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND (hl_txnid = 0 OR" + " (hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "'))"; + //(hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "') is for multi-statement txns where + //some query attempted to lock (thus LOCK_WAITING state) but is giving up due to timeout for example LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -735,8 +929,9 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) { try { - //todo: this is expensive call: at least 2 update queries per txn - //is this really worth it? + //todo: do all updates in 1 SQL statement and check update count + //if update count is less than was requested, go into more expensive checks + //for each txn heartbeatTxn(dbConn, txn); } catch (NoSuchTxnException e) { nosuch.add(txn); @@ -765,11 +960,12 @@ public long compact(CompactionRequest rqst) throws MetaException { Connection dbConn = null; Statement stmt = null; try { - dbConn = getDbConn(getRequiredIsolationLevel()); + lock(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); // Get the id for the next entry in the queue - String s = addForUpdateClause(dbConn, "select ncq_next from NEXT_COMPACTION_QUEUE_ID"); + String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID"); LOG.debug("going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -836,6 +1032,7 @@ public long compact(CompactionRequest rqst) throws MetaException { } finally { closeStmt(stmt); closeDbConn(dbConn); + unlock(); } } catch (RetryException e) { return compact(rqst); @@ -909,16 +1106,25 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaExcep } } + private static void shouldNeverHappen(long txnid) { + throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid)); + } public void addDynamicPartitions(AddDynamicPartitions rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { Connection dbConn = null; Statement stmt = null; + ResultSet lockHandle = null; try { try { + lock(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - // Heartbeat this first to make sure the transaction is still valid. - heartbeatTxn(dbConn, rqst.getTxnid()); + lockHandle = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN); + if(lockHandle == null) { + //ensures txn is still there and in expected state + ensureValidTxn(dbConn, rqst.getTxnid(), stmt); + shouldNeverHappen(rqst.getTxnid()); + } for (String partName : rqst.getPartitionnames()) { StringBuilder buff = new StringBuilder(); buff.append("insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition) values ("); @@ -943,8 +1149,8 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) throw new MetaException("Unable to insert into from transaction database " + StringUtils.stringifyException(e)); } finally { - closeStmt(stmt); - closeDbConn(dbConn); + close(lockHandle, stmt, dbConn); + unlock(); } } catch (RetryException e) { addDynamicPartitions(rqst); @@ -990,13 +1196,15 @@ public long setTimeout(long milliseconds) { protected Connection getDbConn(int isolationLevel) throws SQLException { int rc = doRetryOnConnPool ? 10 : 1; + Connection dbConn = null; while (true) { try { - Connection dbConn = connPool.getConnection(); + dbConn = connPool.getConnection(); dbConn.setAutoCommit(false); dbConn.setTransactionIsolation(isolationLevel); return dbConn; } catch (SQLException e){ + closeDbConn(dbConn); if ((--rc) <= 0) throw e; LOG.error("There is a problem with a connection from the pool, retrying(rc=" + rc + "): " + getMessage(e), e); @@ -1013,7 +1221,9 @@ void rollbackDBConn(Connection dbConn) { } protected void closeDbConn(Connection dbConn) { try { - if (dbConn != null && !dbConn.isClosed()) dbConn.close(); + if (dbConn != null && !dbConn.isClosed()) { + dbConn.close(); + } } catch (SQLException e) { LOG.warn("Failed to close db connection " + getMessage(e)); } @@ -1077,8 +1287,8 @@ protected void checkRetryable(Connection conn, // Derby and newer MySQL driver use the new SQLTransactionRollbackException boolean sendRetrySignal = false; try { - if (dbProduct == null && conn != null) { - determineDatabaseProduct(conn); + if(dbProduct == null) { + throw new IllegalStateException("DB Type not determined yet."); } if (e instanceof SQLTransactionRollbackException || ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES || @@ -1144,8 +1354,7 @@ protected long getDbTime(Connection conn) throws MetaException { try { stmt = conn.createStatement(); String s; - DatabaseProduct prod = determineDatabaseProduct(conn); - switch (prod) { + switch (dbProduct) { case DERBY: s = "values current_timestamp"; break; @@ -1161,7 +1370,7 @@ protected long getDbTime(Connection conn) throws MetaException { break; default: - String msg = "Unknown database product: " + prod.toString(); + String msg = "Unknown database product: " + dbProduct.toString(); LOG.error(msg); throw new MetaException(msg); } @@ -1197,16 +1406,15 @@ protected String getIdentifierQuoteString(Connection conn) throws SQLException { * Determine the database product type * @param conn database connection * @return database product type - * @throws MetaException if the type cannot be determined or is unknown */ - protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaException { + private DatabaseProduct determineDatabaseProduct(Connection conn) { if (dbProduct == null) { - try {//todo: make this work when conn == null + try { String s = conn.getMetaData().getDatabaseProductName(); if (s == null) { String msg = "getDatabaseProductName returns null, can't determine database product"; LOG.error(msg); - throw new MetaException(msg); + throw new IllegalStateException(msg); } else if (s.equals("Apache Derby")) { dbProduct = DatabaseProduct.DERBY; } else if (s.equals("Microsoft SQL Server")) { @@ -1220,13 +1428,13 @@ protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaE } else { String msg = "Unrecognized database product name <" + s + ">"; LOG.error(msg); - throw new MetaException(msg); + throw new IllegalStateException(msg); } } catch (SQLException e) { String msg = "Unable to get database product name: " + e.getMessage(); LOG.error(msg); - throw new MetaException(msg); + throw new IllegalStateException(msg); } } return dbProduct; @@ -1398,7 +1606,7 @@ private void checkQFileTestHack() { // We may have already created the tables and thus don't need to redo it. if (!e.getMessage().contains("already exists")) { throw new RuntimeException("Unable to set up transaction database for" + - " testing: " + e.getMessage()); + " testing: " + e.getMessage(), e); } } } @@ -1410,6 +1618,9 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException /** * TODO: expose this as an operation to client. Useful for streaming API to abort all remaining * trasnactions in a batch on IOExceptions. + * Caller must rollback the transaction if not all transactions were aborted since this will not + * attempt to delete associated locks in this case. + * * @param dbConn An active connection * @param txnids list of transactions to abort * @param max_heartbeat value used by {@link #performTimeOuts()} to ensure this doesn't Abort txn which were @@ -1423,17 +1634,12 @@ private int abortTxns(Connection dbConn, List txnids, long max_heartbeat) if (txnids.isEmpty()) { return 0; } - if(Connection.TRANSACTION_SERIALIZABLE != dbConn.getTransactionIsolation()) { - /** Running this at SERIALIZABLE prevents new locks being added for this txnid(s) concurrently - * which would cause them to become orphaned. - */ - throw new IllegalStateException("Expected SERIALIZABLE isolation. Found " + dbConn.getTransactionIsolation()); - } try { stmt = dbConn.createStatement(); - - // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS - StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in ("); + //This is an update statement, thus at any Isolation level will take Write locks so will block + //all other ops using S4U on TXNS row. + StringBuilder buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + + "' where txn_state = '" + TXN_OPEN + "' and txn_id in ("); boolean first = true; for (Long id : txnids) { if (first) first = false; @@ -1441,13 +1647,22 @@ private int abortTxns(Connection dbConn, List txnids, long max_heartbeat) buf.append(id); } buf.append(')'); + if(max_heartbeat > 0) { + buf.append(" and txn_last_heartbeat < ").append(max_heartbeat); + } LOG.debug("Going to execute update <" + buf.toString() + ">"); - stmt.executeUpdate(buf.toString()); + updateCnt = stmt.executeUpdate(buf.toString()); + if(updateCnt < txnids.size()) { + /** + * have to bail in this case since we don't know which transactions were not Aborted and + * thus don't know which locks to delete + * This may happen due to a race between {@link #heartbeat(HeartbeatRequest)} operation and + * {@link #performTimeOuts()} + */ + return updateCnt; + } - //todo: seems like we should do this first and if it misses, don't bother with - //delete from HIVE_LOCKS since it will be rolled back - buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + - "' where txn_state = '" + TXN_OPEN + "' and txn_id in ("); + buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in ("); first = true; for (Long id : txnids) { if (first) first = false; @@ -1455,155 +1670,14 @@ private int abortTxns(Connection dbConn, List txnids, long max_heartbeat) buf.append(id); } buf.append(')'); - if(max_heartbeat > 0) { - buf.append(" and txn_last_heartbeat < ").append(max_heartbeat); - } LOG.debug("Going to execute update <" + buf.toString() + ">"); - updateCnt = stmt.executeUpdate(buf.toString()); - + stmt.executeUpdate(buf.toString()); } finally { closeStmt(stmt); } return updateCnt; } - /** - * Isolation Level Notes: - * Run at SERIALIZABLE to make sure no one is adding new locks while we are checking conflicts here. - * - * Ramblings: - * We could perhaps get away with writing to TXN_COMPONENTS + HIVE_LOCKS in 1 txn@RC - * since this is just in Wait state. - * (Then we'd need to ensure that in !wait case we don't rely on rollback and again in case of - * failure, the W locks will timeout if failure does not propagate to client in some way, or it - * will and client will Abort). - * Actually, whether we can do this depends on what happens when you try to get a lock and notice - * a conflicting locks in W mode do we wait in this case? if so it's a problem because while you - * are checking new locks someone may insert new W locks that you don't see... - * On the other hand, this attempts to be 'fair', i.e. process locks in order so could we assume - * that additional W locks will have higher IDs???? - * - * We can use Select for Update to generate the next LockID. In fact we can easily do this in a separate txn. - * This avoids contention on NEXT_LOCK_ID. The rest of the logic will be still need to be done at Serializable, I think, - * but it will not be updating the same row from 2 DB. - * - * Request a lock - * @param dbConn database connection - * @param rqst lock information - * @return information on whether the lock was acquired. - * @throws NoSuchTxnException - * @throws TxnAbortedException - */ - private LockResponse lock(Connection dbConn, LockRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException { - // We want to minimize the number of concurrent lock requests being issued. If we do not we - // get a large number of deadlocks in the database, since this method has to both clean - // timedout locks and insert new locks. This synchronization barrier will not eliminate all - // deadlocks, and the code is still resilient in the face of a database deadlock. But it - // will reduce the number. This could have been done via a lock table command in the - // underlying database, but was not for two reasons. One, different databases have different - // syntax for lock table, making it harder to use. Two, that would lock the HIVE_LOCKS table - // and prevent other operations (such as committing transactions, showing locks, - // etc.) that should not interfere with this one. - synchronized (lockLock) { - Statement stmt = null; - ResultSet rs = null; - try { - long txnid = rqst.getTxnid(); - if (txnid > 0) { - // Heartbeat the transaction so we know it is valid and we avoid it timing out while we - // are locking. - heartbeatTxn(dbConn, txnid); - } - stmt = dbConn.createStatement(); - - /** Get the next lock id. - * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. - * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7, - * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and aquires the locks. Then 7 unblocks, - * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} - * doesn't block on locks acquired later than one it's checking*/ - String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID"); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_lock_id"); - } - long extLockId = rs.getLong(1); - s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - - if (txnid > 0) { - // For each component in this lock request, - // add an entry to the txn_components table - // This must be done before HIVE_LOCKS is accessed - - //Isolation note: - //the !wait option is not actually used anywhere. W/o that, - // if we make CompactionTxnHandler.markCleaned() not delete anything above certain txn_id - //then there is not reason why this insert into TXN_COMPONENTS needs to run at Serializable. - // - // Again, w/o the !wait option, insert into HIVE_LOCKS should be OK at READ_COMMITTED as long - //as check lock is at serializable (or any other way to make sure it's exclusive) - for (LockComponent lc : rqst.getComponent()) { - String dbName = lc.getDbname(); - String tblName = lc.getTablename(); - String partName = lc.getPartitionname(); - s = "insert into TXN_COMPONENTS " + - "(tc_txnid, tc_database, tc_table, tc_partition) " + - "values (" + txnid + ", '" + dbName + "', " + - (tblName == null ? "null" : "'" + tblName + "'") + ", " + - (partName == null ? "null" : "'" + partName + "'") + ")"; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - } - } - - long intLockId = 0; - for (LockComponent lc : rqst.getComponent()) { - intLockId++; - String dbName = lc.getDbname(); - String tblName = lc.getTablename(); - String partName = lc.getPartitionname(); - LockType lockType = lc.getType(); - char lockChar = 'z'; - switch (lockType) { - case EXCLUSIVE: lockChar = LOCK_EXCLUSIVE; break; - case SHARED_READ: lockChar = LOCK_SHARED; break; - case SHARED_WRITE: lockChar = LOCK_SEMI_SHARED; break; - } - long now = getDbTime(dbConn); - s = "insert into HIVE_LOCKS " + - " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + - " values (" + extLockId + ", " + - + intLockId + "," + txnid + ", '" + - dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" ) - + ", " + (partName == null ? "null" : "'" + partName + "'") + - ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + - //for locks associated with a txn, we always heartbeat txn and timeout based on that - (isValidTxn(txnid) ? 0 : now) + ", '" + - rqst.getUser() + "', '" + rqst.getHostname() + "')"; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - } - /**to make txns shorter we could commit here and start a new txn for checkLock. This would - * require moving checkRetryable() down into here. Could we then run the part before this - * commit are READ_COMMITTED?*/ - return checkLock(dbConn, extLockId); - } catch (NoSuchLockException e) { - // This should never happen, as we just added the lock id - throw new MetaException("Couldn't find a lock we just created! " + e.getMessage()); - } finally { - close(rs); - closeStmt(stmt); - } - } - } private static boolean isValidTxn(long txnId) { return txnId != 0; } @@ -1618,12 +1692,17 @@ private static boolean isValidTxn(long txnId) { private LockResponse checkLock(Connection dbConn, long extLockId) throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { + if(dbConn.getTransactionIsolation() != Connection.TRANSACTION_SERIALIZABLE) { + //longer term we should instead use AUX_TABLE/S4U to serialize all checkLock() operations + //that would be less prone to deadlocks + throw new IllegalStateException("Unexpected Isolation Level: " + dbConn.getTransactionIsolation()); + } List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now LockResponse response = new LockResponse(); response.setLockid(extLockId); LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId)); - Savepoint save = dbConn.setSavepoint(); + Savepoint save = dbConn.setSavepoint();//todo: get rid of this StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in ("); @@ -2057,9 +2136,8 @@ private void timeOutLocks(Connection dbConn, long now) { * Make {@code noSelectsqlQuery} to be "a,b from T" and this method will return the * appropriately modified row limiting query. */ - private String addLimitClause(Connection dbConn, int numRows, String noSelectsqlQuery) throws MetaException { - DatabaseProduct prod = determineDatabaseProduct(dbConn); - switch (prod) { + private String addLimitClause(int numRows, String noSelectsqlQuery) throws MetaException { + switch (dbProduct) { case DERBY: //http://db.apache.org/derby/docs/10.7/ref/rrefsqljoffsetfetch.html return "select " + noSelectsqlQuery + " fetch first " + numRows + " rows only"; @@ -2076,7 +2154,7 @@ private String addLimitClause(Connection dbConn, int numRows, String noSelectsql //https://msdn.microsoft.com/en-us/library/ms189463.aspx return "select TOP(" + numRows + ") " + noSelectsqlQuery; default: - String msg = "Unrecognized database product name <" + prod + ">"; + String msg = "Unrecognized database product name <" + dbProduct + ">"; LOG.error(msg); throw new MetaException(msg); } @@ -2110,7 +2188,7 @@ public void performTimeOuts() { stmt = dbConn.createStatement(); String s = " txn_id from TXNS where txn_state = '" + TXN_OPEN + "' and txn_last_heartbeat < " + (now - timeout); - s = addLimitClause(dbConn, 250 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s); + s = addLimitClause(250 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if(!rs.next()) { @@ -2127,8 +2205,7 @@ public void performTimeOuts() { } } while(rs.next()); dbConn.commit(); - close(rs, stmt, dbConn); - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + close(rs, stmt, null); int numTxnsAborted = 0; for(List batchToAbort : timedOutTxns) { if(abortTxns(dbConn, batchToAbort, now - timeout) == batchToAbort.size()) { @@ -2331,45 +2408,11 @@ private static String getMessage(SQLException ex) { return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")"; } /** - * Returns one of {@link java.sql.Connection#TRANSACTION_SERIALIZABLE} TRANSACTION_READ_COMMITTED, etc. - * Different DBs support different concurrency management options. This class relies on SELECT ... FOR UPDATE - * functionality. Where that is not available, SERIALIZABLE isolation is used. - * This method must always agree with {@link #addForUpdateClause(java.sql.Connection, String)}, in that - * if FOR UPDATE is not available, must run operation at SERIALIZABLE. - */ - private int getRequiredIsolationLevel() throws MetaException, SQLException { - if(dbProduct == null) { - Connection tmp = null; - try { - tmp = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - determineDatabaseProduct(tmp); - } - finally { - closeDbConn(tmp); - } - } - switch (dbProduct) { - case DERBY: - return Connection.TRANSACTION_SERIALIZABLE; - case MYSQL: - case ORACLE: - case POSTGRES: - case SQLSERVER: - return Connection.TRANSACTION_READ_COMMITTED; - default: - String msg = "Unrecognized database product name <" + dbProduct + ">"; - LOG.error(msg); - throw new MetaException(msg); - } - } - /** * Given a {@code selectStatement}, decorated it with FOR UPDATE or semantically equivalent - * construct. If the DB doesn't support, return original select. This method must always - * agree with {@link #getRequiredIsolationLevel()} + * construct. If the DB doesn't support, return original select. */ - private String addForUpdateClause(Connection dbConn, String selectStatement) throws MetaException { - DatabaseProduct prod = determineDatabaseProduct(dbConn); - switch (prod) { + private String addForUpdateClause(String selectStatement) throws MetaException { + switch (dbProduct) { case DERBY: //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html //sadly in Derby, FOR UPDATE doesn't meant what it should @@ -2386,7 +2429,7 @@ private String addForUpdateClause(Connection dbConn, String selectStatement) thr //https://msdn.microsoft.com/en-us/library/ms187373.aspx return selectStatement + " with(updlock)"; default: - String msg = "Unrecognized database product name <" + prod + ">"; + String msg = "Unrecognized database product name <" + dbProduct + ">"; LOG.error(msg); throw new MetaException(msg); } @@ -2394,6 +2437,9 @@ private String addForUpdateClause(Connection dbConn, String selectStatement) thr static String quoteString(String input) { return "'" + input + "'"; } + static String quoteChar(char c) { + return "'" + c + "'"; + } static CompactionType dbCompactionType2ThriftType(char dbValue) { switch (dbValue) { case MAJOR_TYPE: @@ -2416,4 +2462,20 @@ static Character thriftCompactionType2DbType(CompactionType ct) { return null; } } + + /** + * {@link #lock()} and {@link #unlock()} are used to serialize those operations that require + * Select ... For Update to sequence operations properly. In practice that means when running + * with Derby database. See more notes at class level. + */ + private void lock() { + if(dbProduct == DatabaseProduct.DERBY) { + derbyLock.lock(); + } + } + private void unlock() { + if(dbProduct == DatabaseProduct.DERBY) { + derbyLock.unlock(); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 47dbbb3..e8ebe55 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -407,7 +407,7 @@ void startHeartbeat(long delay) throws LockException { private void stopHeartbeat() { if (heartbeatTask != null && !heartbeatTask.isCancelled() && !heartbeatTask.isDone()) { - heartbeatTask.cancel(true); + heartbeatTask.cancel(false); heartbeatTask = null; } }