diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 58cfbaa..806dbdb 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -618,7 +618,7 @@ public void testTimeOutReaper() throws Exception { } @Test - public void testHearbeat() throws Exception { + public void testHeartbeat() throws Exception { HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName2, tblName2, null); DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames2,",", endPt); StreamingConnection connection = endPt.newConnection(false, null); @@ -632,14 +632,14 @@ public void testHearbeat() throws Exception { Assert.assertEquals("Wrong nubmer of locks: " + response, 1, response.getLocks().size()); ShowLocksResponseElement lock = response.getLocks().get(0); long acquiredAt = lock.getAcquiredat(); - long heartbeatAt = lock.getAcquiredat(); + long heartbeatAt = lock.getLastheartbeat(); txnBatch.heartbeat(); response = msClient.showLocks(); Assert.assertEquals("Wrong number of locks2: " + response, 1, response.getLocks().size()); lock = response.getLocks().get(0); Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, lock.getAcquiredat()); Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() + - ") > old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() > heartbeatAt); + ") == old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() == heartbeatAt); } @Test public void testTransactionBatchEmptyAbort() throws Exception { diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 4d9e8ae..5e4c7be 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -56,6 +56,7 @@ public CompactionTxnHandler(HiveConf conf) { Connection dbConn = null; Set response = new HashSet(); Statement stmt = null; + ResultSet rs = null; try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -64,7 +65,7 @@ public CompactionTxnHandler(HiveConf conf) { String s = "select distinct ctc_database, ctc_table, " + "ctc_partition from COMPLETED_TXN_COMPONENTS"; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); while (rs.next()) { CompactionInfo info = new CompactionInfo(); info.dbname = rs.getString(1); @@ -72,6 +73,7 @@ public CompactionTxnHandler(HiveConf conf) { info.partName = rs.getString(3); response.add(info); } + rs.close(); // Check for aborted txns s = "select tc_database, tc_table, tc_partition " + @@ -97,8 +99,7 @@ public CompactionTxnHandler(HiveConf conf) { LOG.error("Unable to connect to transaction database " + e.getMessage()); checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + maxAborted + ")"); } finally { - closeDbConn(dbConn); - closeStmt(stmt); + close(rs, stmt, dbConn); } return response; } @@ -118,7 +119,7 @@ public void setRunAs(long cq_id, String user) throws MetaException { Connection dbConn = null; Statement stmt = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; LOG.debug("Going to execute update <" + s + ">"); @@ -153,46 +154,58 @@ public void setRunAs(long cq_id, String user) throws MetaException { public CompactionInfo findNextToCompact(String workerId) throws MetaException { try { Connection dbConn = null; - CompactionInfo info = new CompactionInfo(); - Statement stmt = null; + ResultSet rs = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select cq_id, cq_database, cq_table, cq_partition, " + "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); if (!rs.next()) { LOG.debug("No compactions found ready to compact"); dbConn.rollback(); return null; } - info.id = rs.getLong(1); - info.dbname = rs.getString(2); - info.tableName = rs.getString(3); - info.partName = rs.getString(4); - switch (rs.getString(5).charAt(0)) { - case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; - case MINOR_TYPE: info.type = CompactionType.MINOR; break; - default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); - } - - // Now, update this record as being worked on by this worker. - long now = getDbTime(dbConn); - s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + - "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id; - LOG.debug("Going to execute update <" + s + ">"); - int updCount = stmt.executeUpdate(s); - if (updCount != 1) { + do { + CompactionInfo info = new CompactionInfo(); + info.id = rs.getLong(1); + info.dbname = rs.getString(2); + info.tableName = rs.getString(3); + info.partName = rs.getString(4); + switch (rs.getString(5).charAt(0)) { + case MAJOR_TYPE: + info.type = CompactionType.MAJOR; + break; + case MINOR_TYPE: + info.type = CompactionType.MINOR; + break; + default: + throw new MetaException("Unexpected compaction type " + rs.getString(5)); + } + // Now, update this record as being worked on by this worker. + long now = getDbTime(dbConn); + s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + + "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id + + " AND cq_state='" + INITIATED_STATE + "'"; + LOG.debug("Going to execute update <" + s + ">"); + int updCount = stmt.executeUpdate(s); + if(updCount == 1) { + dbConn.commit(); + return info; + } + if(updCount == 0) { + LOG.debug("Another Worker picked up " + info); + continue; + } LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " + - info + ". updCnt=" + updCount); - LOG.debug("Going to rollback"); + info + ". updCnt=" + updCount + "."); dbConn.rollback(); - } - LOG.debug("Going to commit"); - dbConn.commit(); - return info; + return null; + } while( rs.next()); + dbConn.rollback(); + return null; } catch (SQLException e) { LOG.error("Unable to select next element for compaction, " + e.getMessage()); LOG.debug("Going to rollback"); @@ -201,8 +214,7 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); - closeStmt(stmt); + close(rs, stmt, dbConn); } } catch (RetryException e) { return findNextToCompact(workerId); @@ -219,7 +231,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { Connection dbConn = null; Statement stmt = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + "cq_worker_id = null where cq_id = " + info.id; @@ -240,8 +252,8 @@ public void markCompacted(CompactionInfo info) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); closeStmt(stmt); + closeDbConn(dbConn); } } catch (RetryException e) { markCompacted(info); @@ -258,6 +270,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { List rc = new ArrayList(); Statement stmt = null; + ResultSet rs = null; try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); @@ -265,7 +278,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { String s = "select cq_id, cq_database, cq_table, cq_partition, " + "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); while (rs.next()) { CompactionInfo info = new CompactionInfo(); info.id = rs.getLong(1); @@ -291,8 +304,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); - closeStmt(stmt); + close(rs, stmt, dbConn); } } catch (RetryException e) { return findReadyToClean(); @@ -303,23 +315,28 @@ public void markCompacted(CompactionInfo info) throws MetaException { * This will remove an entry from the queue after * it has been compacted. * - * todo: possibly a problem? Worker will start with DB in state X (wrt this partition). + * todo: Worker will start with DB in state X (wrt this partition). * while it's working more txns will happen, against partition it's compacting. * then this will delete state up to X and since then. There may be new delta files created * between compaction starting and cleaning. These will not be compacted until more * transactions happen. So this ideally should only delete * up to TXN_ID that was compacted (i.e. HWM in Worker?) Then this can also run - * at READ_COMMITTED + * at READ_COMMITTED. So this means we'd want to store HWM in COMPACTION_QUEUE when + * Worker picks up the job. * * Also, by using this method when Worker fails, we prevent future compactions from - * running until more data is written to tale or compaction is invoked explicitly + * running until more data is written to table or compaction is invoked explicitly * @param info info on the compaction entry to remove */ public void markCleaned(CompactionInfo info) throws MetaException { try { Connection dbConn = null; Statement stmt = null; + ResultSet rs = null; try { + //do we need serializable? Once we have the HWM as above, no. Before that + //it's debatable, but problem described above applies either way + //Thus can drop to RC dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; @@ -344,19 +361,20 @@ public void markCleaned(CompactionInfo info) throws MetaException { "marking compaction entry as clean!"); } - + //todo: add distinct in query s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + info.tableName + "'"; if (info.partName != null) s += " and tc_partition = '" + info.partName + "'"; LOG.debug("Going to execute update <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); Set txnids = new HashSet(); while (rs.next()) txnids.add(rs.getLong(1)); if (txnids.size() > 0) { // Remove entries from txn_components, as there may be aborted txn components StringBuilder buf = new StringBuilder(); + //todo: add a safeguard to make sure IN clause is not too large; break up by txn id buf.append("delete from TXN_COMPONENTS where tc_txnid in ("); boolean first = true; for (long id : txnids) { @@ -394,8 +412,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); - closeStmt(stmt); + close(rs, stmt, dbConn); } } catch (RetryException e) { markCleaned(info); @@ -409,14 +426,17 @@ public void cleanEmptyAbortedTxns() throws MetaException { try { Connection dbConn = null; Statement stmt = null; + ResultSet rs = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + //Aborted is a terminal state, so nothing about the txn can change + //after that, so READ COMMITTED is sufficient. + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select txn_id from TXNS where " + "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + "txn_state = '" + TXN_ABORTED + "'"; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); Set txnids = new HashSet(); while (rs.next()) txnids.add(rs.getLong(1)); if (txnids.size() > 0) { @@ -443,8 +463,7 @@ public void cleanEmptyAbortedTxns() throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); - closeStmt(stmt); + close(rs, stmt, dbConn); } } catch (RetryException e) { cleanEmptyAbortedTxns(); @@ -465,7 +484,7 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { Connection dbConn = null; Statement stmt = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" @@ -485,8 +504,8 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); closeStmt(stmt); + closeDbConn(dbConn); } } catch (RetryException e) { revokeFromLocalWorkers(hostname); @@ -507,7 +526,7 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { Connection dbConn = null; Statement stmt = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); long latestValidStart = getDbTime(dbConn) - timeout; stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" @@ -528,8 +547,8 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { - closeDbConn(dbConn); closeStmt(stmt); + closeDbConn(dbConn); } } catch (RetryException e) { revokeTimedoutWorkers(timeout); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 5c5e6ff..7f8cb71 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -22,6 +22,7 @@ import org.apache.commons.dbcp.ConnectionFactory; import org.apache.commons.dbcp.DriverManagerConnectionFactory; import org.apache.commons.dbcp.PoolableConnectionFactory; +import org.apache.tools.ant.taskdefs.Java; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.dbcp.PoolingDataSource; @@ -52,6 +53,11 @@ * and {@link org.apache.hadoop.hive.common.JavaUtils#lockIdToString(long)} in all messages. * The txnid:X and lockid:Y matches how Thrift object toString() methods are generated, * so keeping the format consistent makes grep'ing the logs much easier. + * + * Note on HIVE_LOCKS.hl_last_heartbeat. + * For locks that are part of transaction, we set this 0 (would rather set it to NULL but + * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding + * transaction in TXNS. */ public class TxnHandler { // Compactor states @@ -150,12 +156,20 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { // subsequently shows up in the open list that's ok. Connection dbConn = null; Statement stmt = null; + ResultSet rs = null; try { + /** + * This method can run at READ_COMMITTED as long as long as + * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic. + * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with + * adding corresponding entries into TXNS. The reason is that any txnid below HWM + * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed. + */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select ntxn_next - 1 from NEXT_TXN_ID"; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); if (!rs.next()) { throw new MetaException("Transaction tables not properly " + "initialized, no record found in next_txn_id"); @@ -165,7 +179,7 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { throw new MetaException("Transaction tables not properly " + "initialized, null record found in next_txn_id"); } - + close(rs); List txnInfo = new ArrayList(); //need the WHERE clause below to ensure consistent results with READ_COMMITTED s = "select txn_id, txn_state, txn_user, txn_host from TXNS where txn_id <= " + hwm; @@ -199,8 +213,7 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { throw new MetaException("Unable to select from transaction database: " + getMessage(e) + StringUtils.stringifyException(e)); } finally { - closeStmt(stmt); - closeDbConn(dbConn); + close(rs, stmt, dbConn); } } catch (RetryException e) { return getOpenTxnsInfo(); @@ -215,12 +228,16 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { // subsequently shows up in the open list that's ok. Connection dbConn = null; Statement stmt = null; + ResultSet rs = null; try { + /** + * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} +\ */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select ntxn_next - 1 from NEXT_TXN_ID"; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); if (!rs.next()) { throw new MetaException("Transaction tables not properly " + "initialized, no record found in next_txn_id"); @@ -230,7 +247,7 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { throw new MetaException("Transaction tables not properly " + "initialized, null record found in next_txn_id"); } - + close(rs); Set openList = new HashSet(); //need the WHERE clause below to ensure consistent results with READ_COMMITTED s = "select txn_id from TXNS where txn_id <= " + hwm; @@ -249,8 +266,7 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { throw new MetaException("Unable to select from transaction database, " + StringUtils.stringifyException(e)); } finally { - closeStmt(stmt); - closeDbConn(dbConn); + close(rs, stmt, dbConn); } } catch (RetryException e) { return getOpenTxns(); @@ -284,17 +300,35 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { try { Connection dbConn = null; Statement stmt = null; + ResultSet rs = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + /** + * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure + * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic. + * Also, advancing the counter must work when multiple metastores are running, thus either + * SELECT ... FOR UPDATE is used or SERIALIZABLE isolation. The former is preferred since it prevents + * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID. + * + * In the current design, there can be several metastore instances running in a given Warehouse. + * This makes ideas like reserving a range of IDs to save trips to DB impossible. For example, + * a client may go to MS1 and start a transaction with ID 500 to update a particular row. + * Now the same client will start another transaction, except it ends up on MS2 and may get + * transaction ID 400 and update the same row. Now the merge that happens to materialize the snapshot + * on read will thing the version of the row from transaction ID 500 is the latest one. + * + * Longer term we can consider running Active-Passive MS (at least wrt to ACID operations). This + * set could support a write-through cache for added performance. + */ + dbConn = getDbConn(getRequiredIsolationLevel()); // Make sure the user has not requested an insane amount of txns. int maxTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); if (numTxns > maxTxns) numTxns = maxTxns; stmt = dbConn.createStatement(); - String s = "select ntxn_next from NEXT_TXN_ID"; + String s = addForUpdateClause(dbConn, "select ntxn_next from NEXT_TXN_ID"); LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); if (!rs.next()) { throw new MetaException("Transaction database not properly " + "configured, can't find next transaction id."); @@ -312,10 +346,11 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { List txnIds = new ArrayList(numTxns); for (long i = first; i < first + numTxns; i++) { ps.setLong(1, i); + //todo: this would be more efficient with a single insert with multiple rows in values() + //need add a safeguard to not exceed the DB capabilities. ps.executeUpdate(); txnIds.add(i); } - LOG.debug("Going to commit"); dbConn.commit(); return new OpenTxnsResponse(txnIds); @@ -326,8 +361,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { - closeStmt(stmt); - closeDbConn(dbConn); + close(rs, stmt, dbConn); } } catch (RetryException e) { return openTxns(rqst); @@ -369,6 +403,24 @@ public void commitTxn(CommitTxnRequest rqst) Connection dbConn = null; Statement stmt = null; try { + /** + * This has to run at SERIALIZABLE to make no concurrent attempt to acquire locks (insert into HIVE_LOCKS) + * can happen. Otherwise we may end up with orphaned locks. While lock() and commitTxn() should not + * normally run concurrently (for same txn) but could due to bugs in the client which could then + * (w/o SERIALIZABLE) corrupt internal transaction manager state. Also competes with abortTxn() + * + * Sketch of an improvement: + * Introduce a new transaction state in TXNS, state 'c'. This is a transient Committed state. + * commitTxn() would mark the txn 'c' in TXNS in an independent txn. Other operation like + * lock(), heartbeat(), etc would raise errors for txn in 'c' state and getOpenTxns(), etc would + * treat 'c' txn as 'open'. Then this method could run in READ COMMITTED since the + * entry for this txn in TXNS in 'c' acts like a monitor. + * Since the move to 'c' state is in one txn (to make it visible) and the rest of the + * operations in another (could even be made separate txns), there is a possibility of failure + * between the 2. Thus the AcidHouseKeeper logic to timeout txns should apply 'c' state txns. + * + * Or perhaps Select * TXNS where txn_id = " + txnid; for update + */ dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); // Before we do the commit heartbeat the txn. This is slightly odd in that we're going to @@ -423,7 +475,7 @@ public LockResponse lock(LockRequest rqst) Connection dbConn = null; try { dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); - return lock(dbConn, rqst, true); + return lock(dbConn, rqst); } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); @@ -438,48 +490,49 @@ public LockResponse lock(LockRequest rqst) } } - public LockResponse lockNoWait(LockRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException { - try { - Connection dbConn = null; - try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); - return lock(dbConn, rqst, false); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "lockNoWait(" + rqst + ")"); - throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); - } finally { - closeDbConn(dbConn); - } - } catch (RetryException e) { - return lockNoWait(rqst); - } - } - + /** + * Why doesn't this get a txnid as parameter? The caller should either know the txnid or know there isn't one. + * Either way getTxnIdFromLockId() will not be needed. This would be a Thrift change. + * + * Also, when lock acquisition returns WAITING, it's retried every 15 seconds (best case, see DbLockManager.backoff(), + * in practice more often) + * which means this is heartbeating way more often than hive.txn.timeout and creating extra load on DB. + * + * The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired. + * We should make CheckLockRequest include timestamp or last request to skip unnecessary heartbeats. Thrift change. + * + * {@link #checkLock(java.sql.Connection, long)} must run at SERIALIZABLE (make sure some lock we are checking + * against doesn't move from W to A in another txn) but this method can heartbeat in + * separate txn at READ_COMMITTED. + */ public LockResponse checkLock(CheckLockRequest rqst) throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { try { Connection dbConn = null; + long extLockId = rqst.getLockid(); try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); - long extLockId = rqst.getLockid(); - + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); // Heartbeat on the lockid first, to assure that our lock is still valid. // Then look up the lock info (hopefully in the cache). If these locks // are associated with a transaction then heartbeat on that as well. - heartbeatLock(dbConn, extLockId); - long txnid = getTxnIdFromLockId(dbConn, extLockId); - if (txnid > 0) heartbeatTxn(dbConn, txnid); - return checkLock(dbConn, extLockId, true); + Long txnid = getTxnIdFromLockId(dbConn, extLockId); + if(txnid == null) { + throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); + } + if (txnid > 0) { + heartbeatTxn(dbConn, txnid); + } + else { + heartbeatLock(dbConn, extLockId); + } + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + return checkLock(dbConn, extLockId); } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); checkRetryable(dbConn, e, "checkLock(" + rqst + " )"); throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); + JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } @@ -489,39 +542,56 @@ public LockResponse checkLock(CheckLockRequest rqst) } + /** + * This would have been made simpler if all locks were associated with a txn. Then only txn needs to + * be heartbeated, committed, etc. no need for client to track individual locks. + */ public void unlock(UnlockRequest rqst) throws NoSuchLockException, TxnOpenException, MetaException { try { Connection dbConn = null; Statement stmt = null; + long extLockId = rqst.getLockid(); try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); - // Odd as it seems, we need to heartbeat first because this touches the - // lock table and assures that our locks our still valid. If they are - // not, this will throw an exception and the heartbeat will fail. - long extLockId = rqst.getLockid(); - heartbeatLock(dbConn, extLockId); - long txnid = getTxnIdFromLockId(dbConn, extLockId); - // If there is a valid txnid, throw an exception, - // as locks associated with transactions should be unlocked only when the - // transaction is committed or aborted. - if (txnid > 0) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - String msg = "Unlocking locks associated with transaction" + - " not permitted. Lockid " + JavaUtils.lockIdToString(extLockId) + " is associated with " + - "transaction " + JavaUtils.txnIdToString(txnid); - LOG.error(msg); - throw new TxnOpenException(msg); - } + /** + * This method is logically like commit for read-only auto commit queries. + * READ_COMMITTED since this only has 1 delete statement and no new entries with the + * same hl_lock_ext_id can be added, i.e. all rows with a given hl_lock_ext_id are + * created in a single atomic operation. + * Theoretically, this competes with {@link #lock(org.apache.hadoop.hive.metastore.api.LockRequest)} + * but hl_lock_ext_id is not known until that method returns. + * Also competes with {@link #checkLock(org.apache.hadoop.hive.metastore.api.CheckLockRequest)} + * but using SERIALIZABLE doesn't materially change the interaction. + * If "delete" stmt misses, additional logic is best effort to produce meaningful error msg. + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId; + //hl_txnid <> 0 means it's associated with a transaction + String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND hl_txnid = 0"; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { LOG.debug("Going to rollback"); dbConn.rollback(); - throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); + Long txnid = getTxnIdFromLockId(dbConn, extLockId); + if(txnid == null) { + LOG.error("No lock found for unlock(" + rqst + ")"); + throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); + } + if(txnid != 0) { + String msg = "Unlocking locks associated with transaction" + + " not permitted. Lockid " + JavaUtils.lockIdToString(extLockId) + " is associated with " + + "transaction " + JavaUtils.txnIdToString(txnid); + LOG.error(msg); + throw new TxnOpenException(msg); + } + if(txnid == 0) { + //we didn't see this lock when running DELETE stmt above but now it showed up + //so should "should never happen" happened... + String msg = "Found lock " + JavaUtils.lockIdToString(extLockId) + " with " + JavaUtils.txnIdToString(txnid); + LOG.error(msg); + throw new MetaException(msg); + } } LOG.debug("Going to commit"); dbConn.commit(); @@ -530,7 +600,7 @@ public void unlock(UnlockRequest rqst) rollbackDBConn(dbConn); checkRetryable(dbConn, e, "unlock(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); + JavaUtils.lockIdToString(extLockId) + " " + StringUtils.stringifyException(e)); } finally { closeStmt(stmt); closeDbConn(dbConn); @@ -615,6 +685,10 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { } } + /** + * {@code ids} should only have txnid or lockid but not both, ideally. + * Currently DBTxnManager.heartbeat() enforces this. + */ public void heartbeat(HeartbeatRequest ids) throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { try { @@ -647,9 +721,17 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst rsp.setNosuch(nosuch); rsp.setAborted(aborted); try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + /** + * READ_COMMITTED is sufficient since {@link #heartbeatTxn(java.sql.Connection, long)} + * only has 1 update statement in it and + * we only update existing txns, i.e. nothing can add additional txns that this operation + * would care about (which would have required SERIALIZABLE) + */ + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) { try { + //todo: this is expensive call: at least 2 update queries per txn + //is this really worth it? heartbeatTxn(dbConn, txn); } catch (NoSuchTxnException e) { nosuch.add(txn); @@ -678,11 +760,11 @@ public void compact(CompactionRequest rqst) throws MetaException { Connection dbConn = null; Statement stmt = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(getRequiredIsolationLevel()); stmt = dbConn.createStatement(); // Get the id for the next entry in the queue - String s = "select ncq_next from NEXT_COMPACTION_QUEUE_ID"; + String s = addForUpdateClause(dbConn, "select ncq_next from NEXT_COMPACTION_QUEUE_ID"); LOG.debug("going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -1293,19 +1375,31 @@ private void checkQFileTestHack() { } } + private int abortTxns(Connection dbConn, List txnids) throws SQLException { + return abortTxns(dbConn, txnids, -1); + } /** - * Abort a group of txns + * TODO: expose this as an operation to client. Useful for streaming API to abort all remaining + * trasnactions in a batch on IOExceptions. * @param dbConn An active connection * @param txnids list of transactions to abort + * @param max_heartbeat value used by {@link #performTimeOuts()} to ensure this doesn't Abort txn which were + * hearbetated after #performTimeOuts() select and this operation. * @return Number of aborted transactions * @throws SQLException */ - private int abortTxns(Connection dbConn, List txnids) throws SQLException { + private int abortTxns(Connection dbConn, List txnids, long max_heartbeat) throws SQLException { Statement stmt = null; int updateCnt = 0; if (txnids.isEmpty()) { return 0; } + if(Connection.TRANSACTION_SERIALIZABLE != dbConn.getTransactionIsolation()) { + /** Running this at SERIALIZABLE prevents new locks being added for this txnid(s) concurrently + * which would cause them to become orphaned. + */ + throw new IllegalStateException("Expected SERIALIZABLE isolation. Found " + dbConn.getTransactionIsolation()); + } try { stmt = dbConn.createStatement(); @@ -1321,6 +1415,8 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException LOG.debug("Going to execute update <" + buf.toString() + ">"); stmt.executeUpdate(buf.toString()); + //todo: seems like we should do this first and if it misses, don't bother with + //delete from HIVE_LOCKS since it will be rolled back buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_state = '" + TXN_OPEN + "' and txn_id in ("); first = true; @@ -1330,6 +1426,9 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException buf.append(id); } buf.append(')'); + if(max_heartbeat > 0) { + buf.append(" and txn_last_heartbeat < ").append(max_heartbeat); + } LOG.debug("Going to execute update <" + buf.toString() + ">"); updateCnt = stmt.executeUpdate(buf.toString()); @@ -1340,22 +1439,33 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException } /** + * Isolation Level Notes: + * Run at SERIALIZABLE to make sure no one is adding new locks while we are checking conflicts here. + * + * Ramblings: + * We could perhaps get away with writing to TXN_COMPONENTS + HIVE_LOCKS in 1 txn@RC + * since this is just in Wait state. + * (Then we'd need to ensure that in !wait case we don't rely on rollback and again in case of + * failure, the W locks will timeout if failure does not propagate to client in some way, or it + * will and client will Abort). + * Actually, whether we can do this depends on what happens when you try to get a lock and notice + * a conflicting locks in W mode do we wait in this case? if so it's a problem because while you + * are checking new locks someone may insert new W locks that you don't see... + * On the other hand, this attempts to be 'fair', i.e. process locks in order so could we assume + * that additional W locks will have higher IDs???? + * + * We can use Select for Update to generate the next LockID. In fact we can easily do this in a separate txn. + * This avoids contention on NEXT_LOCK_ID. The rest of the logic will be still need to be done at Serializable, I think, + * but it will not be updating the same row from 2 DB. + * * Request a lock * @param dbConn database connection * @param rqst lock information - * @param wait whether to wait for this lock. The function will return immediately one way or - * another. If true and the lock could not be acquired the response will have a - * state of WAITING. The caller will then need to poll using - * {@link #checkLock(org.apache.hadoop.hive.metastore.api.CheckLockRequest)}. If - * false and the lock could not be acquired, then the response will have a state - * of NOT_ACQUIRED. The caller will need to call - * {@link #lockNoWait(org.apache.hadoop.hive.metastore.api.LockRequest)} again to - * attempt another lock. * @return information on whether the lock was acquired. * @throws NoSuchTxnException * @throws TxnAbortedException */ - private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) + private LockResponse lock(Connection dbConn, LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException { // We want to minimize the number of concurrent lock requests being issued. If we do not we // get a large number of deadlocks in the database, since this method has to both clean @@ -1368,13 +1478,25 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) // etc.) that should not interfere with this one. synchronized (lockLock) { Statement stmt = null; + ResultSet rs = null; try { + long txnid = rqst.getTxnid(); + if (txnid > 0) { + // Heartbeat the transaction so we know it is valid and we avoid it timing out while we + // are locking. + heartbeatTxn(dbConn, txnid); + } stmt = dbConn.createStatement(); - // Get the next lock id. - String s = "select nl_next from NEXT_LOCK_ID"; + /** Get the next lock id. + * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. + * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7, + * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and aquires the locks. Then 7 unblocks, + * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} + * doesn't block on locks acquired later than one it's checking*/ + String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID"); LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); if (!rs.next()) { LOG.debug("Going to rollback"); dbConn.rollback(); @@ -1385,18 +1507,19 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); - LOG.debug("Going to commit."); - dbConn.commit(); - long txnid = rqst.getTxnid(); if (txnid > 0) { - // Heartbeat the transaction so we know it is valid and we avoid it timing out while we - // are locking. - heartbeatTxn(dbConn, txnid); - // For each component in this lock request, // add an entry to the txn_components table // This must be done before HIVE_LOCKS is accessed + + //Isolation note: + //the !wait option is not actually used anywhere. W/o that, + // if we make CompactionTxnHandler.markCleaned() not delete anything above certain txn_id + //then there is not reason why this insert into TXN_COMPONENTS needs to run at Serializable. + // + // Again, w/o the !wait option, insert into HIVE_LOCKS should be OK at READ_COMMITTED as long + //as check lock is at serializable (or any other way to make sure it's exclusive) for (LockComponent lc : rqst.getComponent()) { String dbName = lc.getDbname(); String tblName = lc.getTablename(); @@ -1429,34 +1552,42 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + " values (" + extLockId + ", " + - + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" + + + intLockId + "," + txnid + ", '" + dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" ) + ", " + (partName == null ? "null" : "'" + partName + "'") + - ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + now + ", '" + + ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + + //for locks associated with a txn, we always heartbeat txn and timeout based on that + (isValidTxn(txnid) ? 0 : now) + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')"; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); } - LockResponse rsp = checkLock(dbConn, extLockId, wait); - if (!wait && rsp.getState() != LockState.ACQUIRED) { - LOG.debug("Lock not acquired, going to rollback"); - dbConn.rollback(); - rsp = new LockResponse(); - rsp.setState(LockState.NOT_ACQUIRED); - } - return rsp; + /**to make txns shorter we could commit here and start a new txn for checkLock. This would + * require moving checkRetryable() down into here. Could we then run the part before this + * commit are READ_COMMITTED?*/ + return checkLock(dbConn, extLockId); } catch (NoSuchLockException e) { // This should never happen, as we just added the lock id throw new MetaException("Couldn't find a lock we just created!"); } finally { + close(rs); closeStmt(stmt); } } } - + private static boolean isValidTxn(long txnId) { + return txnId != 0; + } + /** + * Note: this calls acquire() for (extLockId,intLockId) but extLockId is the same and we either take + * all locks for given extLockId or none. Would be more efficient to update state on all locks + * at once. Semantics are the same since this is all part of the same txn@serializable. + * + * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller + * hl_lock_ext_id by only checking earlier locks. + */ private LockResponse checkLock(Connection dbConn, - long extLockId, - boolean alwaysCommit) + long extLockId) throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now LockResponse response = new LockResponse(); @@ -1609,19 +1740,15 @@ private LockResponse checkLock(Connection dbConn, case WAIT: if(!ignoreConflict(info, locks[i])) { wait(dbConn, save); - if (alwaysCommit) { - // In the case where lockNoWait has been called we don't want to commit because - // it's going to roll everything back. In every other case we want to commit here. - LOG.debug("Going to commit"); - dbConn.commit(); - } + LOG.debug("Going to commit"); + dbConn.commit(); response.setState(LockState.WAITING); LOG.debug("Lock(" + info + ") waiting for Lock(" + locks[i] + ")"); return response; } //fall through to ACQUIRE case ACQUIRE: - acquire(dbConn, stmt, extLockId, info.intLockId); + acquire(dbConn, stmt, extLockId, info); acquired = true; break; case KEEP_LOOKING: @@ -1633,7 +1760,7 @@ private LockResponse checkLock(Connection dbConn, // If we've arrived here and we have not already acquired, it means there's nothing in the // way of the lock, so acquire the lock. - if (!acquired) acquire(dbConn, stmt, extLockId, info.intLockId); + if (!acquired) acquire(dbConn, stmt, extLockId, info); } // We acquired all of the locks, so commit and return acquired. @@ -1677,26 +1804,31 @@ private void wait(Connection dbConn, Savepoint save) throws SQLException { dbConn.rollback(save); } - private void acquire(Connection dbConn, Statement stmt, long extLockId, long intLockId) + private void acquire(Connection dbConn, Statement stmt, long extLockId, LockInfo lockInfo) throws SQLException, NoSuchLockException, MetaException { long now = getDbTime(dbConn); String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " + - "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " + - extLockId + " and hl_lock_int_id = " + intLockId; + //if lock is part of txn, heartbeat info is in txn record + "hl_last_heartbeat = " + (isValidTxn(lockInfo.txnId) ? 0 : now) + + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " + + extLockId + " and hl_lock_int_id = " + lockInfo.intLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { LOG.debug("Going to rollback"); dbConn.rollback(); throw new NoSuchLockException("No such lock: (" + JavaUtils.lockIdToString(extLockId) + "," + - + intLockId + ")"); + + lockInfo.intLockId + ") " + JavaUtils.txnIdToString(lockInfo.txnId)); } // We update the database, but we don't commit because there may be other // locks together with this, and we only want to acquire one if we can // acquire all. } - // Heartbeats on the lock table. This commits, so do not enter it with any state + /** + * Heartbeats on the lock table. This commits, so do not enter it with any state. + * Should not be called on a lock that belongs to transaction. + */ private void heartbeatLock(Connection dbConn, long extLockId) throws NoSuchLockException, SQLException, MetaException { // If the lock id is 0, then there are no locks in this heartbeat @@ -1731,7 +1863,6 @@ private void heartbeatTxn(Connection dbConn, long txnid) try { stmt = dbConn.createStatement(); long now = getDbTime(dbConn); - ensureValidTxn(dbConn, txnid, stmt); String s = "update TXNS set txn_last_heartbeat = " + now + " where txn_id = " + txnid + " and txn_state = '" + TXN_OPEN + "'"; LOG.debug("Going to execute update <" + s + ">"); @@ -1742,10 +1873,6 @@ private void heartbeatTxn(Connection dbConn, long txnid) dbConn.rollback(); throw new NoSuchTxnException("No such txn: " + txnid); } - //update locks for this txn to the same heartbeat - s = "update HIVE_LOCKS set hl_last_heartbeat = " + now + " where hl_txnid = " + txnid; - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); LOG.debug("Going to commit"); dbConn.commit(); } finally { @@ -1760,6 +1887,7 @@ private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { + //todo: add LIMIT 1 instead of count - should be more efficient s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid; ResultSet rs2 = stmt.executeQuery(s); boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0; @@ -1775,28 +1903,28 @@ private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt LOG.debug("Going to rollback"); dbConn.rollback(); throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid) + - " already aborted");//todo: add time of abort, which is not currently tracked + " already aborted");//todo: add time of abort, which is not currently tracked. Requires schema change } } - // NEVER call this function without first calling heartbeat(long, long) - private long getTxnIdFromLockId(Connection dbConn, long extLockId) + private Long getTxnIdFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException { Statement stmt = null; + ResultSet rs = null; try { stmt = dbConn.createStatement(); String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); + rs = stmt.executeQuery(s); if (!rs.next()) { - throw new MetaException("This should never happen! We already " + - "checked the lock existed but now we can't find it!"); + return null; } long txnid = rs.getLong(1); - LOG.debug("Return " + JavaUtils.txnIdToString(rs.wasNull() ? -1 : txnid)); - return (rs.wasNull() ? -1 : txnid); + LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + JavaUtils.txnIdToString(txnid)); + return txnid; } finally { + close(rs); closeStmt(stmt); } } @@ -1832,14 +1960,13 @@ private long getTxnIdFromLockId(Connection dbConn, long extLockId) // for read-only autoCommit=true statements. This does a commit, // and thus should be done before any calls to heartbeat that will leave // open transactions. - private void timeOutLocks(Connection dbConn) { + private void timeOutLocks(Connection dbConn, long now) { Statement stmt = null; try { - long now = getDbTime(dbConn); stmt = dbConn.createStatement(); // Remove any timed out locks from the table. String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + - (now - timeout) + " and (hl_txnid = 0 or hl_txnid is NULL)";//when txnid is > 0, the lock is + (now - timeout) + " and hl_txnid = 0";//when txnid is > 0, the lock is //associated with a txn and is handled by performTimeOuts() //want to avoid expiring locks for a txn w/o expiring the txn itself LOG.debug("Going to execute update <" + s + ">"); @@ -1891,6 +2018,8 @@ private String addLimitClause(Connection dbConn, int numRows, String noSelectsql } } /** + * Isolation Level Notes + * Plain: RC is OK * This will find transactions that have timed out and abort them. * Will also delete locks which are not associated with a transaction and have timed out * Tries to keep transactions (against metastore db) small to reduce lock contention. @@ -1900,9 +2029,19 @@ public void performTimeOuts() { Statement stmt = null; ResultSet rs = null; try { - dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + //We currently commit after selecting the TXNS to abort. So whether SERIALIZABLE + //READ_COMMITTED, the effect is the same. We could use FOR UPDATE on Select from TXNS + //and do the whole performTimeOuts() in a single huge transaction, but the only benefit + //would be to make sure someone cannot heartbeat one of these txns at the same time. + //The attempt to heartbeat would block and fail immediately after it's unblocked. + //With current (RC + multiple txns) implementation it is possible for someone to send + //heartbeat at the very end of the expire interval, and just after the Select from TXNS + //is made, in which case heartbeat will succeed but txn will still be Aborted. + //Solving this corner case is not worth the perf penalty. The client should heartbeat in a + //timely way. long now = getDbTime(dbConn); - timeOutLocks(dbConn); + timeOutLocks(dbConn, now); while(true) { stmt = dbConn.createStatement(); String s = " txn_id from TXNS where txn_state = '" + TXN_OPEN + @@ -1923,16 +2062,26 @@ public void performTimeOuts() { timedOutTxns.add(currentBatch); } } while(rs.next()); - close(rs, stmt, null); dbConn.commit(); + close(rs, stmt, dbConn); + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + int numTxnsAborted = 0; for(List batchToAbort : timedOutTxns) { - abortTxns(dbConn, batchToAbort); - dbConn.commit(); - //todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout' - LOG.info("Aborted the following transactions due to timeout: " + batchToAbort.toString()); + if(abortTxns(dbConn, batchToAbort, now - timeout) == batchToAbort.size()) { + dbConn.commit(); + numTxnsAborted += batchToAbort.size(); + //todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout' + LOG.info("Aborted the following transactions due to timeout: " + batchToAbort.toString()); + } + else { + //could not abort all txns in this batch - this may happen because in parallel with this + //operation there was activity on one of the txns in this batch (commit/abort/heartbeat) + //This is not likely but may happen if client experiences long pause between heartbeats or + //unusually long/extreme pauses between heartbeat() calls and other logic in checkLock(), + //lock(), etc. + dbConn.rollback(); + } } - int numTxnsAborted = (timedOutTxns.size() - 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE + - timedOutTxns.get(timedOutTxns.size() - 1).size(); LOG.info("Aborted " + numTxnsAborted + " transactions due to timeout"); } } catch (SQLException ex) { @@ -2110,4 +2259,97 @@ private static boolean isRetryable(Exception ex) { private static String getMessage(SQLException ex) { return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")"; } + /** + * Returns one of {@link java.sql.Connection#TRANSACTION_SERIALIZABLE} TRANSACTION_READ_COMMITTED, etc. + * Different DBs support different concurrency management options. This class relies on SELECT ... FOR UPDATE + * functionality. Where that is not available, SERIALIZABLE isolation is used. + * This method must always agree with {@link #addForUpdateClause(java.sql.Connection, String)}, in that + * if FOR UPDATE is not available, must run operation at SERIALIZABLE. + */ + private int getRequiredIsolationLevel() throws MetaException, SQLException { + if(dbProduct == null) { + Connection tmp = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + determineDatabaseProduct(tmp); + closeDbConn(tmp); + } + switch (dbProduct) { + case DERBY: + return Connection.TRANSACTION_SERIALIZABLE; + case MYSQL: + case ORACLE: + case POSTGRES: + case SQLSERVER: + return Connection.TRANSACTION_READ_COMMITTED; + default: + String msg = "Unrecognized database product name <" + dbProduct + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } + /** + * Given a {@code selectStatement}, decorated it with FOR UPDATE or semantically equivalent + * construct. If the DB doesn't support, return original select. This method must always + * agree with {@link #getRequiredIsolationLevel()} + */ + private String addForUpdateClause(Connection dbConn, String selectStatement) throws MetaException { + DatabaseProduct prod = determineDatabaseProduct(dbConn); + switch (prod) { + case DERBY: + //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html + //sadly in Derby, FOR UPDATE doesn't meant what it should + return selectStatement; + case MYSQL: + //http://dev.mysql.com/doc/refman/5.7/en/select.html + case ORACLE: + //https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html + case POSTGRES: + //http://www.postgresql.org/docs/9.0/static/sql-select.html + return selectStatement + " for update"; + case SQLSERVER: + //https://msdn.microsoft.com/en-us/library/ms189499.aspx + //https://msdn.microsoft.com/en-us/library/ms187373.aspx + return selectStatement + " with(updlock)"; + default: + String msg = "Unrecognized database product name <" + prod + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + } + /** + * the caller is expected to retry if this fails + * + * @return + * @throws SQLException + * @throws MetaException + */ + private long generateNewExtLockId() throws SQLException, MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(getRequiredIsolationLevel()); + stmt = dbConn.createStatement(); + + // Get the next lock id. + String s = addForUpdateClause(dbConn, "select nl_next from NEXT_LOCK_ID"); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_lock_id"); + } + long extLockId = rs.getLong(1); + s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); + LOG.debug("Going to commit."); + dbConn.commit(); + return extLockId; + } + finally { + close(rs, stmt, dbConn); + } + } } diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index df42f1a..06e0932 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -280,43 +280,6 @@ public void testRevokeTimedOutWorkers() throws Exception { } @Test - public void testLockNoWait() throws Exception { - // Test that we can acquire the lock alone - LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, - "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - List components = new ArrayList(1); - components.add(comp); - LockRequest req = new LockRequest(components, "me", "localhost"); - LockResponse res = txnHandler.lockNoWait(req); - assertTrue(res.getState() == LockState.ACQUIRED); - txnHandler.unlock(new UnlockRequest(res.getLockid())); - - // test that another lock blocks it - comp = new LockComponent(LockType.SHARED_READ, LockLevel.DB, - "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lock(req); - assertEquals(LockState.ACQUIRED, res.getState()); - - comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, - "mydb"); - comp.setTablename("mytable"); - comp.setPartitionname("mypartition"); - components.clear(); - components.add(comp); - req = new LockRequest(components, "me", "localhost"); - res = txnHandler.lockNoWait(req); - assertEquals(LockState.NOT_ACQUIRED, res.getState()); - assertEquals(1, TxnDbUtil.findNumCurrentLocks()); - } - - @Test public void testFindPotentialCompactions() throws Exception { // Test that committing unlocks long txnid = openTxn(); diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 59114fe..4debd04 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -1107,8 +1107,8 @@ public void showLocks() throws Exception { assertNull(lock.getPartname()); assertEquals(LockState.ACQUIRED, lock.getState()); assertEquals(LockType.EXCLUSIVE, lock.getType()); - assertTrue(begining <= lock.getLastheartbeat() && - System.currentTimeMillis() >= lock.getLastheartbeat()); + assertTrue(lock.toString(), 0 == lock.getLastheartbeat() && + lock.getTxnid() != 0); assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining + " and " + System.currentTimeMillis(), begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat()); @@ -1122,8 +1122,8 @@ public void showLocks() throws Exception { assertNull(lock.getPartname()); assertEquals(LockState.WAITING, lock.getState()); assertEquals(LockType.SHARED_READ, lock.getType()); - assertTrue(begining <= lock.getLastheartbeat() && - System.currentTimeMillis() >= lock.getLastheartbeat()); + assertTrue(lock.toString(), 0 == lock.getLastheartbeat() && + lock.getTxnid() != 0); assertEquals(0, lock.getAcquiredat()); assertEquals("me", lock.getUser()); assertEquals("localhost", lock.getHostname()); @@ -1135,7 +1135,7 @@ public void showLocks() throws Exception { assertEquals("yourpartition", lock.getPartname()); assertEquals(LockState.ACQUIRED, lock.getState()); assertEquals(LockType.SHARED_WRITE, lock.getType()); - assertTrue(begining <= lock.getLastheartbeat() && + assertTrue(lock.toString(), begining <= lock.getLastheartbeat() && System.currentTimeMillis() >= lock.getLastheartbeat()); assertTrue(begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat()); diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 93c7a54..45b0ec7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1042,7 +1042,7 @@ private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) { // don't update it after that until txn completes. Thus the check for {@code initiatingTransaction} //For autoCommit=true, Read-only statements, txn is implicit, i.e. lock in the snapshot //for each statement. - recordValidTxns(); + recordValidTxns();//todo: we should only need to do this for RO query if it has ACID resources in it. } return 0;