diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 34cb2ca..d0b95ab 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -52,7 +52,7 @@ public CompactionTxnHandler(HiveConf conf) { * or runAs set since these are only potential compactions not actual ones. */ public Set findPotentialCompactions(int maxAborted) throws MetaException { - Connection dbConn = getDbConn(); + Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); Set response = new HashSet(); try { Statement stmt = dbConn.createStatement(); @@ -105,7 +105,7 @@ public CompactionTxnHandler(HiveConf conf) { */ public void setRunAs(long cq_id, String user) throws MetaException { try { - Connection dbConn = getDbConn(); + Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); try { Statement stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; @@ -143,13 +143,13 @@ public void setRunAs(long cq_id, String user) throws MetaException { */ public CompactionInfo findNextToCompact(String workerId) throws MetaException { try { - Connection dbConn = getDbConn(); + Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); CompactionInfo info = new CompactionInfo(); try { Statement stmt = dbConn.createStatement(); String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "' for update"; + "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -207,7 +207,7 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { */ public void markCompacted(CompactionInfo info) throws MetaException { try { - Connection dbConn = getDbConn(); + Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); try { Statement stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + @@ -246,7 +246,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { * @return information on the entry in the queue. */ public List findReadyToClean() throws MetaException { - Connection dbConn = getDbConn(); + Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); List rc = new ArrayList(); try { @@ -293,7 +293,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { */ public void markCleaned(CompactionInfo info) throws MetaException { try { - Connection dbConn = getDbConn(); + Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); try { Statement stmt = dbConn.createStatement(); String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; @@ -384,7 +384,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { */ public void cleanEmptyAbortedTxns() throws MetaException { try { - Connection dbConn = getDbConn(); + Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); try { Statement stmt = dbConn.createStatement(); String s = "select txn_id from TXNS where " + @@ -440,7 +440,7 @@ public void cleanEmptyAbortedTxns() throws MetaException { */ public void revokeFromLocalWorkers(String hostname) throws MetaException { try { - Connection dbConn = getDbConn(); + Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); try { Statement stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" @@ -484,7 +484,7 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { */ public void revokeTimedoutWorkers(long timeout) throws MetaException { try { - Connection dbConn = getDbConn(); + Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); long latestValidStart = getDbTime(dbConn) - timeout; try { Statement stmt = dbConn.createStatement(); diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 412eb28..6f200f8 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -66,8 +66,8 @@ static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName()); static private BoneCP connPool; - private static Boolean lockLock = new Boolean("true"); // Random object to lock on for the lock - // method + private static final Boolean lockLock = new Boolean("true"); // Random object to lock on for the + // lock method /** * Number of consecutive deadlocks we have seen @@ -119,13 +119,12 @@ public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { // open transactions. To avoid needing a transaction on the underlying // database we'll look at the current transaction number first. If it // subsequently shows up in the open list that's ok. - Connection dbConn = getDbConn(); + Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); try { Statement stmt = dbConn.createStatement(); - LOG.debug("Going to execute query "); - rs = stmt.executeQuery("select txn_id, txn_state, txn_user, txn_host from TXNS"); + s = "select txn_id, txn_state, txn_user, txn_host from TXNS"; + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); while (rs.next()) { char c = rs.getString(2).charAt(0); TxnState state; @@ -179,14 +179,13 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { // open transactions. To avoid needing a transaction on the underlying // database we'll look at the current transaction number first. If it // subsequently shows up in the open list that's ok. - Connection dbConn = getDbConn(); + Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); try { timeOutTxns(dbConn); Statement stmt = dbConn.createStatement(); - LOG.debug("Going to execute query "); - rs = stmt.executeQuery("select txn_id from TXNS"); + s = "select txn_id from TXNS"; + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); while (rs.next()) { openList.add(rs.getLong(1)); } @@ -234,7 +234,7 @@ public static ValidTxnList createValidTxnList(GetOpenTxnsResponse txns) { public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { int numTxns = rqst.getNum_txns(); try { - Connection dbConn = getDbConn(); + Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); try { // Make sure the user has not requested an insane amount of txns. int maxTxns = HiveConf.getIntVar(conf, @@ -242,16 +242,15 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { if (numTxns > maxTxns) numTxns = maxTxns; Statement stmt = dbConn.createStatement(); - LOG.debug("Going to execute query "); - ResultSet rs = stmt.executeQuery("select nl_next from NEXT_LOCK_ID " + - "for update"); + // Get the next lock id. + String s = "select nl_next from NEXT_LOCK_ID"; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { LOG.debug("Going to rollback"); dbConn.rollback(); @@ -1113,7 +1117,7 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) "initialized, no record found in next_lock_id"); } long extLockId = rs.getLong(1); - String s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); + s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit."); @@ -1261,7 +1265,6 @@ private LockResponse checkLock(Connection dbConn, query.append("))"); } } - query.append(" for update"); LOG.debug("Going to execute query <" + query.toString() + ">"); ResultSet rs = stmt.executeQuery(query.toString()); @@ -1360,7 +1363,7 @@ private LockResponse checkLock(Connection dbConn, } private void wait(Connection dbConn, Savepoint save) throws SQLException { - // Need to rollback because we did a select for update but we didn't + // Need to rollback because we did a select that acquired locks but we didn't // actually update anything. Also, we may have locked some locks as // acquired that we now want to not acquire. It's ok to rollback because // once we see one wait, we're done, we won't look for more. @@ -1418,8 +1421,7 @@ private void heartbeatTxn(Connection dbConn, long txnid) Statement stmt = dbConn.createStatement(); long now = getDbTime(dbConn); // We need to check whether this transaction is valid and open - String s = "select txn_state from TXNS where txn_id = " + - txnid + " for update"; + String s = "select txn_state from TXNS where txn_id = " + txnid; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) {