diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 4d9e8ae..5e4c7be 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -56,6 +56,7 @@ public CompactionTxnHandler(HiveConf conf) { Connection dbConn = null; Set response = new HashSet(); Statement stmt = null; + ResultSet rs = null; try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -64,7 +65,7 @@ public CompactionTxnHandler(HiveConf conf) { String s = "select distinct ctc_database, ctc_table, " + "ctc_partition from COMPLETED_TXN_COMPONENTS"; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); while (rs.next()) { CompactionInfo info = new CompactionInfo(); info.dbname = rs.getString(1); @@ -72,6 +73,7 @@ public CompactionTxnHandler(HiveConf conf) { info.partName = rs.getString(3); response.add(info); } + rs.close(); // Check for aborted txns s = "select tc_database, tc_table, tc_partition " + @@ -97,8 +99,7 @@ public CompactionTxnHandler(HiveConf conf) { LOG.error("Unable to connect to transaction database " + e.getMessage()); checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + maxAborted + ")"); } finally { - closeDbConn(dbConn); - closeStmt(stmt); + close(rs, stmt, dbConn); } return response; } @@ -118,7 +119,7 @@ public void setRunAs(long cq_id, String user) throws MetaException { Connection dbConn = null; Statement stmt = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; LOG.debug("Going to execute update <" + s + ">"); @@ -153,46 +154,58 @@ public void setRunAs(long cq_id, String user) throws MetaException { public CompactionInfo findNextToCompact(String workerId) throws MetaException { try { Connection dbConn = null; - CompactionInfo info = new CompactionInfo(); - Statement stmt = null; + ResultSet rs = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select cq_id, cq_database, cq_table, cq_partition, " + "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); if (!rs.next()) { LOG.debug("No compactions found ready to compact"); dbConn.rollback(); return null; } - info.id = rs.getLong(1); - info.dbname = rs.getString(2); - info.tableName = rs.getString(3); - info.partName = rs.getString(4); - switch (rs.getString(5).charAt(0)) { - case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; - case MINOR_TYPE: info.type = CompactionType.MINOR; break; - default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); - } - - // Now, update this record as being worked on by this worker. - long now = getDbTime(dbConn); - s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + - "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id; - LOG.debug("Going to execute update <" + s + ">"); - int updCount = stmt.executeUpdate(s); - if (updCount != 1) { + do { + CompactionInfo info = new CompactionInfo(); + info.id = rs.getLong(1); + info.dbname = rs.getString(2); + info.tableName = rs.getString(3); + info.partName = rs.getString(4); + switch (rs.getString(5).charAt(0)) { + case MAJOR_TYPE: + info.type = CompactionType.MAJOR; + break; + case MINOR_TYPE: + info.type = CompactionType.MINOR; + break; + default: + throw new MetaException("Unexpected compaction type " + rs.getString(5)); + } + // Now, update this record as being worked on by this worker. + long now = getDbTime(dbConn); + s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + + "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id + + " AND cq_state='" + INITIATED_STATE + "'"; + LOG.debug("Going to execute update <" + s + ">"); + int updCount = stmt.executeUpdate(s); + if(updCount == 1) { + dbConn.commit(); + return info; + } + if(updCount == 0) { + LOG.debug("Another Worker picked up " + info); + continue; + } LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " + - info + ". updCnt=" + updCount); - LOG.debug("Going to rollback"); + info + ". updCnt=" + updCount + "."); dbConn.rollback(); - } - LOG.debug("Going to commit"); - dbConn.commit(); - return info; + return null; + } while( rs.next()); + dbConn.rollback(); + return null; } catch (SQLException e) { LOG.error("Unable to select next element for compaction, " + e.getMessage()); LOG.debug("Going to rollback"); @@ -201,8 +214,7 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); - closeStmt(stmt); + close(rs, stmt, dbConn); } } catch (RetryException e) { return findNextToCompact(workerId); @@ -219,7 +231,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { Connection dbConn = null; Statement stmt = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + "cq_worker_id = null where cq_id = " + info.id; @@ -240,8 +252,8 @@ public void markCompacted(CompactionInfo info) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); closeStmt(stmt); + closeDbConn(dbConn); } } catch (RetryException e) { markCompacted(info); @@ -258,6 +270,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { List rc = new ArrayList(); Statement stmt = null; + ResultSet rs = null; try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -265,7 +278,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { String s = "select cq_id, cq_database, cq_table, cq_partition, " + "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); while (rs.next()) { CompactionInfo info = new CompactionInfo(); info.id = rs.getLong(1); @@ -291,8 +304,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); - closeStmt(stmt); + close(rs, stmt, dbConn); } } catch (RetryException e) { return findReadyToClean(); @@ -303,23 +315,28 @@ public void markCompacted(CompactionInfo info) throws MetaException { * This will remove an entry from the queue after * it has been compacted. * - * todo: possibly a problem? Worker will start with DB in state X (wrt this partition). + * todo: Worker will start with DB in state X (wrt this partition). * while it's working more txns will happen, against partition it's compacting. * then this will delete state up to X and since then. There may be new delta files created * between compaction starting and cleaning. These will not be compacted until more * transactions happen. So this ideally should only delete * up to TXN_ID that was compacted (i.e. HWM in Worker?) Then this can also run - * at READ_COMMITTED + * at READ_COMMITTED. So this means we'd want to store HWM in COMPACTION_QUEUE when + * Worker picks up the job. * * Also, by using this method when Worker fails, we prevent future compactions from - * running until more data is written to tale or compaction is invoked explicitly + * running until more data is written to table or compaction is invoked explicitly * @param info info on the compaction entry to remove */ public void markCleaned(CompactionInfo info) throws MetaException { try { Connection dbConn = null; Statement stmt = null; + ResultSet rs = null; try { + //do we need serializable? Once we have the HWM as above, no. Before that + //it's debatable, but problem described above applies either way + //Thus can drop to RC dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; @@ -344,19 +361,20 @@ public void markCleaned(CompactionInfo info) throws MetaException { "marking compaction entry as clean!"); } - + //todo: add distinct in query s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + info.tableName + "'"; if (info.partName != null) s += " and tc_partition = '" + info.partName + "'"; LOG.debug("Going to execute update <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); Set txnids = new HashSet(); while (rs.next()) txnids.add(rs.getLong(1)); if (txnids.size() > 0) { // Remove entries from txn_components, as there may be aborted txn components StringBuilder buf = new StringBuilder(); + //todo: add a safeguard to make sure IN clause is not too large; break up by txn id buf.append("delete from TXN_COMPONENTS where tc_txnid in ("); boolean first = true; for (long id : txnids) { @@ -394,8 +412,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); - closeStmt(stmt); + close(rs, stmt, dbConn); } } catch (RetryException e) { markCleaned(info); @@ -409,14 +426,17 @@ public void cleanEmptyAbortedTxns() throws MetaException { try { Connection dbConn = null; Statement stmt = null; + ResultSet rs = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + //Aborted is a terminal state, so nothing about the txn can change + //after that, so READ COMMITTED is sufficient. + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select txn_id from TXNS where " + "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + "txn_state = '" + TXN_ABORTED + "'"; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); Set txnids = new HashSet(); while (rs.next()) txnids.add(rs.getLong(1)); if (txnids.size() > 0) { @@ -443,8 +463,7 @@ public void cleanEmptyAbortedTxns() throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); - closeStmt(stmt); + close(rs, stmt, dbConn); } } catch (RetryException e) { cleanEmptyAbortedTxns(); @@ -465,7 +484,7 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { Connection dbConn = null; Statement stmt = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" @@ -485,8 +504,8 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); closeStmt(stmt); + closeDbConn(dbConn); } } catch (RetryException e) { revokeFromLocalWorkers(hostname); @@ -507,7 +526,7 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { Connection dbConn = null; Statement stmt = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); long latestValidStart = getDbTime(dbConn) - timeout; stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" @@ -528,8 +547,8 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); closeStmt(stmt); + closeDbConn(dbConn); } } catch (RetryException e) { revokeTimedoutWorkers(timeout); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 5c5e6ff..e46c814 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -22,6 +22,7 @@ import org.apache.commons.dbcp.ConnectionFactory; import org.apache.commons.dbcp.DriverManagerConnectionFactory; import org.apache.commons.dbcp.PoolableConnectionFactory; +import org.apache.tools.ant.taskdefs.Java; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.dbcp.PoolingDataSource; @@ -141,7 +142,13 @@ public TxnHandler(HiveConf conf) { deadlockRetryInterval = retryInterval / 10; } - + /** + * This method can run at READ_COMMITTED as long as long as + * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic. + * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with + * adding corresponding entries into TXNS. The reason is that any txnid below HWM + * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed. + */ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { try { // We need to figure out the current transaction number and the list of @@ -150,12 +157,13 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { // subsequently shows up in the open list that's ok. Connection dbConn = null; Statement stmt = null; + ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select ntxn_next - 1 from NEXT_TXN_ID"; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); if (!rs.next()) { throw new MetaException("Transaction tables not properly " + "initialized, no record found in next_txn_id"); @@ -165,7 +173,7 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { throw new MetaException("Transaction tables not properly " + "initialized, null record found in next_txn_id"); } - + close(rs); List txnInfo = new ArrayList(); //need the WHERE clause below to ensure consistent results with READ_COMMITTED s = "select txn_id, txn_state, txn_user, txn_host from TXNS where txn_id <= " + hwm; @@ -199,14 +207,17 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { throw new MetaException("Unable to select from transaction database: " + getMessage(e) + StringUtils.stringifyException(e)); } finally { - closeStmt(stmt); - closeDbConn(dbConn); + close(rs, stmt, dbConn); } } catch (RetryException e) { return getOpenTxnsInfo(); } } + /** + * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} + * @throws MetaException + */ public GetOpenTxnsResponse getOpenTxns() throws MetaException { try { // We need to figure out the current transaction number and the list of @@ -215,12 +226,13 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { // subsequently shows up in the open list that's ok. Connection dbConn = null; Statement stmt = null; + ResultSet rs = null; try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select ntxn_next - 1 from NEXT_TXN_ID"; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); if (!rs.next()) { throw new MetaException("Transaction tables not properly " + "initialized, no record found in next_txn_id"); @@ -230,7 +242,7 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { throw new MetaException("Transaction tables not properly " + "initialized, null record found in next_txn_id"); } - + close(rs); Set openList = new HashSet(); //need the WHERE clause below to ensure consistent results with READ_COMMITTED s = "select txn_id from TXNS where txn_id <= " + hwm; @@ -249,8 +261,7 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { throw new MetaException("Unable to select from transaction database, " + StringUtils.stringifyException(e)); } finally { - closeStmt(stmt); - closeDbConn(dbConn); + close(rs, stmt, dbConn); } } catch (RetryException e) { return getOpenTxns(); @@ -279,22 +290,40 @@ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long return new ValidReadTxnList(exceptions, highWater); } + /** + * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure + * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic. + * Also, advancing the counter must work when multiple metastores are running, thus either + * SELECT ... FOR UPDATE is used or SERIALIZABLE isolation. The former is preferred since it prevents + * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID. + * + * In the current design, there can be several metastore instances running in a given Warehouse. + * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example, + * a client may go to MS1 and start a transaction with ID 500 to update a particular row. + * Now the same client will start another transaction, except it ends up on MS2 and may get + * transaction ID 400 and update the same row. Now the merge that happens to materialize the snapshot + * on read will thing the version of the row from transaction ID 500 is the latest one. + * + * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations). This + * set could support a write-through cache for added performance. + */ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { int numTxns = rqst.getNum_txns(); try { Connection dbConn = null; Statement stmt = null; + ResultSet rs = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(getRequiredIsolationLevel()); // Make sure the user has not requested an insane amount of txns. int maxTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); if (numTxns > maxTxns) numTxns = maxTxns; stmt = dbConn.createStatement(); - String s = "select ntxn_next from NEXT_TXN_ID"; + String s = addForUpdateClause(dbConn, "select ntxn_next from NEXT_TXN_ID"); LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); if (!rs.next()) { throw new MetaException("Transaction database not properly " + "configured, can't find next transaction id."); @@ -312,6 +341,8 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { List txnIds = new ArrayList(numTxns); for (long i = first; i < first + numTxns; i++) { ps.setLong(1, i); + //todo: this would be more efficient with a single insert with multiple rows in values() + //need add a safeguard to not exceed the DB capabilities. ps.executeUpdate(); txnIds.add(i); } @@ -326,8 +357,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { - closeStmt(stmt); - closeDbConn(dbConn); + close(rs, stmt, dbConn); } } catch (RetryException e) { return openTxns(rqst); @@ -362,6 +392,22 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept } } + /** + * This has to run at SERIALIZABLE to make no concurrent attempt to acquire locks (insert into HIVE_LOCKS) + * can happen. Otherwise we may end up with orphaned locks. While lock() and commitTxn() should not + * normally run concurrently (for same txn) but could due to bugs in the client which could then + * (w/o SERIALIZABLE) corrupt internal transaction manager state. + * + * Sketch of an improvement: + * Introduce a new transaction state in TXNS, state 'c'. This is a transient Committed state. + * commitTxn() would mark the txn 'c' in TXNS in an independent txn. Other operation like + * lock(), heartbeat(), etc would raise errors for txn in 'c' state and getOpenTxns(), etc would + * treat 'c' txn as 'open'. Then this method could run in READ COMMITTED since the + * entry for this txn in TXNS in 'c' acts like a monitor. + * Since the move to 'c' state is in one txn (to make it visible) and the rest of the + * operations in another (could even be made separate txns), there is a possibility of failure + * between the 2. Thus the AcidHouseKeeper logic to timeout txns should apply 'c' state txns. + */ public void commitTxn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { long txnid = rqst.getTxnid(); @@ -438,6 +484,7 @@ public LockResponse lock(LockRequest rqst) } } + //this is not actually used anywhere. There is Thrift method that maps to this public LockResponse lockNoWait(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { try { @@ -459,27 +506,54 @@ public LockResponse lockNoWait(LockRequest rqst) } } + /** + * Why doesn't this get a txnid as parameter? The caller should either know the txnid or know there isn't one. + * Either way getTxnIdFromLockId() will not be needed. This would be a Thrift change. + * + * Also, when lock acquisition returns WAITING, it's retried every 15 seconds (worst case, see DbLockManager.backoff()) + * which means this is heartbeating way more often than hive.txn.timeout and creating extra load on DB. + * + * The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired. + * We should make CheckLockRequest inlclude timestamp or last request to skip unnecesasry heartbeats. Thrift change. + * + * checkLock(dbConn, extLockId, true) must run at SERIALIZABLE but this method can heartbeat in + * separate txn at READ_COMMITTED. + * + * @param rqst + * @return + * @throws NoSuchTxnException + * @throws NoSuchLockException + * @throws TxnAbortedException + * @throws MetaException + */ public LockResponse checkLock(CheckLockRequest rqst) throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { try { Connection dbConn = null; + long extLockId = rqst.getLockid(); try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); - long extLockId = rqst.getLockid(); - + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); // Heartbeat on the lockid first, to assure that our lock is still valid. // Then look up the lock info (hopefully in the cache). If these locks // are associated with a transaction then heartbeat on that as well. - heartbeatLock(dbConn, extLockId); - long txnid = getTxnIdFromLockId(dbConn, extLockId); - if (txnid > 0) heartbeatTxn(dbConn, txnid); + Long txnid = getTxnIdFromLockId(dbConn, extLockId); + if(txnid == null) { + throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); + } + if (txnid > 0) { + heartbeatTxn(dbConn, txnid); + } + else { + heartbeatLock(dbConn, extLockId); + } + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); return checkLock(dbConn, extLockId, true); } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "checkLock(" + rqst + " )"); throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); + JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } @@ -489,39 +563,48 @@ public LockResponse checkLock(CheckLockRequest rqst) } + /** + * This would have been made simpler if all locks were associated with a txn. Then only txn needs to + * be heartbeated, committed, etc. no need for client to track individual locks. + * @param rqst + * @throws NoSuchLockException + * @throws TxnOpenException + * @throws MetaException + */ public void unlock(UnlockRequest rqst) throws NoSuchLockException, TxnOpenException, MetaException { try { Connection dbConn = null; Statement stmt = null; + long extLockId = rqst.getLockid(); try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); - // Odd as it seems, we need to heartbeat first because this touches the - // lock table and assures that our locks our still valid. If they are - // not, this will throw an exception and the heartbeat will fail. - long extLockId = rqst.getLockid(); - heartbeatLock(dbConn, extLockId); - long txnid = getTxnIdFromLockId(dbConn, extLockId); - // If there is a valid txnid, throw an exception, - // as locks associated with transactions should be unlocked only when the - // transaction is committed or aborted. - if (txnid > 0) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - String msg = "Unlocking locks associated with transaction" + - " not permitted. Lockid " + JavaUtils.lockIdToString(extLockId) + " is associated with " + - "transaction " + JavaUtils.txnIdToString(txnid); - LOG.error(msg); - throw new TxnOpenException(msg); - } + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId; + //hl_txnid > 0 means it's associated with a transaction + String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND hl_txnid < 1"; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { + Long txnid = getTxnIdFromLockId(dbConn, extLockId); LOG.debug("Going to rollback"); dbConn.rollback(); - throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); + if(txnid == null) { + throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); + } + if(txnid > 0) { + String msg = "Unlocking locks associated with transaction" + + " not permitted. Lockid " + JavaUtils.lockIdToString(extLockId) + " is associated with " + + "transaction " + JavaUtils.txnIdToString(txnid); + LOG.error(msg); + throw new TxnOpenException(msg); + } + if(txnid == 0) { + //we didn't see this lock when running DELETE stmt above but now it showed up + //so should "should never happen" happen... + String msg = "Found lock " + JavaUtils.lockIdToString(extLockId) + " with " + JavaUtils.txnIdToString(txnid); + LOG.error(msg); + throw new MetaException(msg); + } } LOG.debug("Going to commit"); dbConn.commit(); @@ -530,7 +613,7 @@ public void unlock(UnlockRequest rqst) rollbackDBConn(dbConn); checkRetryable(dbConn, e, "unlock(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); + JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e)); } finally { closeStmt(stmt); closeDbConn(dbConn); @@ -615,6 +698,10 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { } } + /** + * {@code ids} should only have txnid or lockid but not both, ideally. + * Currently DBTxnManager.heartbeat() enforces this. + */ public void heartbeat(HeartbeatRequest ids) throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { try { @@ -637,6 +724,14 @@ public void heartbeat(HeartbeatRequest ids) } } + /** + * Isolation Level note: + * There doesn't seem to be any benefit to run this at Serializable or even part of the same txn + * + * @param rqst + * @return + * @throws MetaException + */ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) throws MetaException { try { @@ -650,6 +745,8 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) { try { + //todo: this is expensive call: at least 2 update queries per txn + //is this really worth it? heartbeatTxn(dbConn, txn); } catch (NoSuchTxnException e) { nosuch.add(txn); @@ -678,11 +775,11 @@ public void compact(CompactionRequest rqst) throws MetaException { Connection dbConn = null; Statement stmt = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(getRequiredIsolationLevel()); stmt = dbConn.createStatement(); // Get the id for the next entry in the queue - String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID"; + String s = addForUpdateClause(dbConn, "select ncq_next from NEXT_COMPACTION_QUEUE_ID"); LOG.debug("going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -1294,7 +1391,11 @@ private void checkQFileTestHack() { } /** - * Abort a group of txns + * Running this at SERIALIZABLE prevents new locks being added for this txnid(s) concurrently + * which would cause them to become orphaned. + * + * TODO: expose this as an operation to client. Useful for streaming API to abort all remaining + * trasnactions in a batch on IOExceptions. * @param dbConn An active connection * @param txnids list of transactions to abort * @return Number of aborted transactions @@ -1340,6 +1441,27 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException } /** + * Isolation Level Notes: + * Separate standalone txn to reserve range of LOCK_IDs. If MS crashes after that, we just \ + * wasted 1 ID, that's it. Otherwise run at SERIALIZABLE to make sure no one is adding + * new locks while we are check conflicts here. + * + * Ramblings: + * We could perhaps get away with writing to TXN_COMPONENTS + HIVE_LOCKS in 1 txn@RC + * since this is just in Wait state. + * (Then we'd need to ensure that in !wait case we don't rely on rollback and again in case of + * failure, the W locks will timeout if failure does not propagate to client in some way, or it + * will and client will Abort). + * Actually, whether we can do this depends on what happens when you try to get a lock and notice + * a conflicting locks in W mode do we wait in this case? if so it's a problem because while you + * are checking new locks someone may insert new W locks that you don't see... + * On the other hand, this attempts to be 'fair', i.e. process locks in order so could we assume + * that additional W locks will have higher IDs???? + * + * We can use Select for Update to generate the next LockID. In fact we can easily do this in a separate txn. + * This avoids contention on NEXT_LOCK_ID. The rest of the logic will be still need to be done at Serializable, I think, + * but it will not be updating the same row from 2 DB. + * * Request a lock * @param dbConn database connection * @param rqst lock information @@ -1357,6 +1479,9 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException */ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException { + + long extLockId = generateNewExtLockId(); + // We want to minimize the number of concurrent lock requests being issued. If we do not we // get a large number of deadlocks in the database, since this method has to both clean // timedout locks and insert new locks. This synchronization barrier will not eliminate all @@ -1370,23 +1495,7 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) Statement stmt = null; try { stmt = dbConn.createStatement(); - - // Get the next lock id. - String s = "select nl_next from NEXT_LOCK_ID"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_lock_id"); - } - long extLockId = rs.getLong(1); - s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); - LOG.debug("Going to commit."); - dbConn.commit(); + String s = null; long txnid = rqst.getTxnid(); if (txnid > 0) { @@ -1397,6 +1506,14 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) // For each component in this lock request, // add an entry to the txn_components table // This must be done before HIVE_LOCKS is accessed + + //Isolation note: + //the !wait option is not actually used anywhere. W/o that, + // if we make CompactionTxnHandler.markCleaned() not delete anything above certain txn_id + //then there is not reason why this insert into TXN_COMPONENTS needs to run at Serializable. + // + // Again, w/o the !wait option, insert into HIVE_LOCKS should be OK at READ_COMMITTED as long + //as check lock is at serializable (or any other way to make sure it's exclusive) for (LockComponent lc : rqst.getComponent()) { String dbName = lc.getDbname(); String tblName = lc.getTablename(); @@ -1429,6 +1546,7 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + " values (" + extLockId + ", " + + //Unfortunately, default txnid value in DbTxnManager is 0, so we never set NULL here. + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" + dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" ) + ", " + (partName == null ? "null" : "'" + partName + "'") + @@ -1731,7 +1849,6 @@ private void heartbeatTxn(Connection dbConn, long txnid) try { stmt = dbConn.createStatement(); long now = getDbTime(dbConn); - ensureValidTxn(dbConn, txnid, stmt); String s = "update TXNS set txn_last_heartbeat = " + now + " where txn_id = " + txnid + " and txn_state = '" + TXN_OPEN + "'"; LOG.debug("Going to execute update <" + s + ">"); @@ -1760,6 +1877,7 @@ private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { + //todo: add LIMIT 1 instead of count - should be more efficient s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid; ResultSet rs2 = stmt.executeQuery(s); boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0; @@ -1775,28 +1893,28 @@ private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt LOG.debug("Going to rollback"); dbConn.rollback(); throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) + - " already aborted");//todo: add time of abort, which is not currently tracked + " already aborted");//todo: add time of abort, which is not currently tracked. Requires schema change } } - // NEVER call this function without first calling heartbeat(long, long) - private long getTxnIdFromLockId(Connection dbConn, long extLockId) + private Long getTxnIdFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException { Statement stmt = null; + ResultSet rs = null; try { stmt = dbConn.createStatement(); String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); if (!rs.next()) { - throw new MetaException("This should never happen! We already " + - "checked the lock existed but now we can't find it!"); + return null; } long txnid = rs.getLong(1); - LOG.debug("Return " + JavaUtils.txnIdToString(rs.wasNull() ? -1 : txnid)); - return (rs.wasNull() ? -1 : txnid); + LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + JavaUtils.txnIdToString(txnid)); + return txnid; } finally { + close(rs); closeStmt(stmt); } } @@ -1839,7 +1957,7 @@ private void timeOutLocks(Connection dbConn) { stmt = dbConn.createStatement(); // Remove any timed out locks from the table. String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + - (now - timeout) + " and (hl_txnid = 0 or hl_txnid is NULL)";//when txnid is > 0, the lock is + (now - timeout) + " and hl_txnid = 0";//when txnid is > 0, the lock is //associated with a txn and is handled by performTimeOuts() //want to avoid expiring locks for a txn w/o expiring the txn itself LOG.debug("Going to execute update <" + s + ">"); @@ -1891,6 +2009,8 @@ private String addLimitClause(Connection dbConn, int numRows, String noSelectsql } } /** + * Isolation Level Notes + * Plain: RC is OK * This will find transactions that have timed out and abort them. * Will also delete locks which are not associated with a transaction and have timed out * Tries to keep transactions (against metastore db) small to reduce lock contention. @@ -1900,7 +2020,17 @@ public void performTimeOuts() { Statement stmt = null; ResultSet rs = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + //We currently commit after selecting the TXNS to abort. So whether SERIALIZABLE + //READ_COMMITTED, the effect is the same. We could use FOR UPDATE on Select from TXNS + //and do the whole performTimeOuts() in a single huge transaction, but the only benefit + //would be to make sure someone cannot heartbeat one of these txns at the same time. + //The attempt to heartbeat would block and fail immediately after it's unblocked. + //With current (RC + multiple txns) implementation it is possible for someone to send + //heartbeat at the very end of the expire interval, and just after the Select from TXNS + //is made, in which case heartbeat will succeed but txn will still be Aborted. + //Solving this corner case is not worth the perf penalty. The client should heartbeat in a + //timely way. long now = getDbTime(dbConn); timeOutLocks(dbConn); while(true) { @@ -2110,4 +2240,97 @@ private static boolean isRetryable(Exception ex) { private static String getMessage(SQLException ex) { return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")"; } + /** + * Returns one of {@link java.sql.Connection#TRANSACTION_SERIALIZABLE} TRANSACTION_READ_COMMITTED, etc. + * Different DBs support different concurrency management options. This class relies on SELECT ... FOR UPDATE + * functionality. Where that is not available, SERIALIZABLE isolation is used. + * This method must always agree with {@link #addForUpdateClause(java.sql.Connection, String)}, in that + * if FOR UPDATE is not available, must run operation at SERIALIZABLE. + */ + private int getRequiredIsolationLevel() throws MetaException, SQLException { + if(dbProduct == null) { + Connection tmp = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + determineDatabaseProduct(tmp); + closeDbConn(tmp); + } + switch (dbProduct) { + case DERBY: + return Connection.TRANSACTION_SERIALIZABLE; + case MYSQL: + case ORACLE: + case POSTGRES: + case SQLSERVER: + return Connection.TRANSACTION_READ_COMMITTED; + default: + String msg = "Unrecognized database product name <" + dbProduct + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } + /** + * Given a {@code selectStatement}, decorated it with FOR UPDATE or semantically equivalent + * construct. If the DB doesn't support, return original select. This method must always + * agree with {@link #getRequiredIsolationLevel()} + */ + private String addForUpdateClause(Connection dbConn, String selectStatement) throws MetaException { + DatabaseProduct prod = determineDatabaseProduct(dbConn); + switch (prod) { + case DERBY: + //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html + //sadly in Derby, FOR UPDATE doesn't meant what it should + return selectStatement; + case MYSQL: + //http://dev.mysql.com/doc/refman/5.7/en/select.html + case ORACLE: + //https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html + case POSTGRES: + //http://www.postgresql.org/docs/9.0/static/sql-select.html + return selectStatement + " for update"; + case SQLSERVER: + //https://msdn.microsoft.com/en-us/library/ms189499.aspx + //https://msdn.microsoft.com/en-us/library/ms187373.aspx + return selectStatement + " with(updlock)"; + default: + String msg = "Unrecognized database product name <" + prod + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } + /** + * the caller is expected to retry if this fails + * + * @return + * @throws SQLException + * @throws MetaException + */ + private long generateNewExtLockId() throws SQLException, MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(getRequiredIsolationLevel()); + stmt = dbConn.createStatement(); + + // Get the next lock id. + String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID"); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_lock_id"); + } + long extLockId = rs.getLong(1); + s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + LOG.debug("Going to commit."); + dbConn.commit(); + return extLockId; + } + finally { + close(rs, stmt, dbConn); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 93c7a54..45b0ec7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1042,7 +1042,7 @@ private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) { // don't update it after that until txn completes. Thus the check for {@code initiatingTransaction} //For autoCommit=true, Read-only statements, txn is implicit, i.e. lock in the snapshot //for each statement. - recordValidTxns(); + recordValidTxns();//todo: we should only need to do this for RO query if it has ACID resources in it. } return 0;