diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index ef6a94b..caad948 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -5377,126 +5377,74 @@ public Function get_function(String dbName, String funcName) // Transaction and locking methods @Override public GetOpenTxnsResponse get_open_txns() throws TException { - try { - return getTxnHandler().getOpenTxns(); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().getOpenTxns(); } // Transaction and locking methods @Override public GetOpenTxnsInfoResponse get_open_txns_info() throws TException { - try { - return getTxnHandler().getOpenTxnsInfo(); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().getOpenTxnsInfo(); } @Override public OpenTxnsResponse open_txns(OpenTxnRequest rqst) throws TException { - try { - return getTxnHandler().openTxns(rqst); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().openTxns(rqst); } @Override public void abort_txn(AbortTxnRequest rqst) throws NoSuchTxnException, TException { - try { - getTxnHandler().abortTxn(rqst); - } catch (MetaException e) { - throw new TException(e); - } + getTxnHandler().abortTxn(rqst); } @Override public void commit_txn(CommitTxnRequest rqst) throws NoSuchTxnException, TxnAbortedException, TException { - try { - getTxnHandler().commitTxn(rqst); - } catch (MetaException e) { - throw new TException(e); - } + getTxnHandler().commitTxn(rqst); } @Override public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, TException { - try { - return getTxnHandler().lock(rqst); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().lock(rqst); } @Override public LockResponse check_lock(CheckLockRequest rqst) throws NoSuchTxnException, TxnAbortedException, NoSuchLockException, TException { - try { - return getTxnHandler().checkLock(rqst); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().checkLock(rqst); } @Override public void unlock(UnlockRequest rqst) throws NoSuchLockException, TxnOpenException, TException { - try { - getTxnHandler().unlock(rqst); - } catch (MetaException e) { - throw new TException(e); - } + getTxnHandler().unlock(rqst); } @Override public ShowLocksResponse show_locks(ShowLocksRequest rqst) throws TException { - try { - return getTxnHandler().showLocks(rqst); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().showLocks(rqst); } @Override public void heartbeat(HeartbeatRequest ids) throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, TException { - try { - getTxnHandler().heartbeat(ids); - } catch (MetaException e) { - throw new TException(e); - } + getTxnHandler().heartbeat(ids); } @Override public HeartbeatTxnRangeResponse heartbeat_txn_range(HeartbeatTxnRangeRequest rqst) throws TException { - try { - return getTxnHandler().heartbeatTxnRange(rqst); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().heartbeatTxnRange(rqst); } @Override public void compact(CompactionRequest rqst) throws TException { - try { - getTxnHandler().compact(rqst); - } catch (MetaException e) { - throw new TException(e); - } + getTxnHandler().compact(rqst); } @Override public ShowCompactResponse show_compact(ShowCompactRequest rqst) throws TException { - try { - return getTxnHandler().showCompact(rqst); - } catch (MetaException e) { - throw new TException(e); - } + return getTxnHandler().showCompact(rqst); } @Override diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index acfc07a..4402e4a 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -52,51 +52,58 @@ public CompactionTxnHandler(HiveConf conf) { * or runAs set since these are only potential compactions not actual ones. */ public Set findPotentialCompactions(int maxAborted) throws MetaException { - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Connection dbConn = null; Set response = new HashSet(); Statement stmt = null; try { - stmt = dbConn.createStatement(); - // Check for completed transactions - String s = "select distinct ctc_database, ctc_table, " + + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + // Check for completed transactions + String s = "select distinct ctc_database, ctc_table, " + "ctc_partition from COMPLETED_TXN_COMPONENTS"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - while (rs.next()) { - CompactionInfo info = new CompactionInfo(); - info.dbname = rs.getString(1); - info.tableName = rs.getString(2); - info.partName = rs.getString(3); - response.add(info); - } + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + while (rs.next()) { + CompactionInfo info = new CompactionInfo(); + info.dbname = rs.getString(1); + info.tableName = rs.getString(2); + info.partName = rs.getString(3); + response.add(info); + } - // Check for aborted txns - s = "select tc_database, tc_table, tc_partition " + + // Check for aborted txns + s = "select tc_database, tc_table, tc_partition " + "from TXNS, TXN_COMPONENTS " + "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " + "group by tc_database, tc_table, tc_partition " + "having count(*) > " + maxAborted; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - while (rs.next()) { - CompactionInfo info = new CompactionInfo(); - info.dbname = rs.getString(1); - info.tableName = rs.getString(2); - info.partName = rs.getString(3); - info.tooManyAborts = true; - response.add(info); - } + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + CompactionInfo info = new CompactionInfo(); + info.dbname = rs.getString(1); + info.tableName = rs.getString(2); + info.partName = rs.getString(3); + info.tooManyAborts = true; + response.add(info); + } - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e) { - LOG.error("Unable to connect to transaction database " + e.getMessage()); - } finally { - closeDbConn(dbConn); - closeStmt(stmt); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e) { + LOG.error("Unable to connect to transaction database " + e.getMessage()); + checkRetryable(dbConn, e, "findPotentialCompactions"); + } finally { + closeDbConn(dbConn); + closeStmt(stmt); + } + return response; + } + catch (RetryException e) { + return findPotentialCompactions(maxAborted); } - return response; } /** @@ -107,35 +114,31 @@ public CompactionTxnHandler(HiveConf conf) { */ public void setRunAs(long cq_id, String user) throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { - stmt = dbConn.createStatement(); - String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; - LOG.debug("Going to execute update <" + s + ">"); - if (stmt.executeUpdate(s) != 1) { - LOG.error("Unable to update compaction record"); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } - LOG.debug("Going to commit"); - dbConn.commit(); - } catch (SQLException e) { - LOG.error("Unable to update compaction queue, " + e.getMessage()); - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "setRunAs"); - } finally { - closeDbConn(dbConn); - closeStmt(stmt); - } - } catch (DeadlockException e) { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + stmt = dbConn.createStatement(); + String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id; + LOG.debug("Going to execute update <" + s + ">"); + if (stmt.executeUpdate(s) != 1) { + LOG.error("Unable to update compaction record"); + LOG.debug("Going to rollback"); + dbConn.rollback(); + } + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.error("Unable to update compaction queue, " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "setRunAs"); + } finally { + closeDbConn(dbConn); + closeStmt(stmt); + } + } catch (RetryException e) { setRunAs(cq_id, user); - } finally { - deadlockCnt = 0; } } @@ -147,14 +150,15 @@ public void setRunAs(long cq_id, String user) throws MetaException { */ public CompactionInfo findNextToCompact(String workerId) throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; CompactionInfo info = new CompactionInfo(); Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); String s = "select cq_id, cq_database, cq_table, cq_partition, " + - "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; + "cq_type from COMPACTION_QUEUE where cq_state = '" + INITIATED_STATE + "'"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { @@ -175,7 +179,7 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { // Now, update this record as being worked on by this worker. long now = getDbTime(dbConn); s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + - "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id; + "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) != 1) { LOG.error("Unable to update compaction record"); @@ -187,38 +191,34 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { return info; } catch (SQLException e) { LOG.error("Unable to select next element for compaction, " + e.getMessage()); - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "findNextToCompact"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "findNextToCompact"); throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); closeStmt(stmt); } - } catch (DeadlockException e) { + } catch (RetryException e) { return findNextToCompact(workerId); - } finally { - deadlockCnt = 0; } } /** * This will mark an entry in the queue as compacted * and put it in the ready to clean state. - * @param info info on the compaciton entry to mark as compacted. + * @param info info on the compaction entry to mark as compacted. */ public void markCompacted(CompactionInfo info) throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " + - "cq_worker_id = null where cq_id = " + info.id; + "cq_worker_id = null where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) != 1) { LOG.error("Unable to update compaction record"); @@ -228,23 +228,18 @@ public void markCompacted(CompactionInfo info) throws MetaException { LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.error("Unable to update compaction queue " + e.getMessage()); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "markCompacted"); + LOG.error("Unable to update compaction queue " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "markCompacted"); throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); closeStmt(stmt); } - } catch (DeadlockException e) { + } catch (RetryException e) { markCompacted(info); - } finally { - deadlockCnt = 0; } } @@ -254,45 +249,48 @@ public void markCompacted(CompactionInfo info) throws MetaException { * @return information on the entry in the queue. */ public List findReadyToClean() throws MetaException { - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Connection dbConn = null; List rc = new ArrayList(); Statement stmt = null; try { - stmt = dbConn.createStatement(); - String s = "select cq_id, cq_database, cq_table, cq_partition, " + + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select cq_id, cq_database, cq_table, cq_partition, " + "cq_type, cq_run_as from COMPACTION_QUEUE where cq_state = '" + READY_FOR_CLEANING + "'"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - while (rs.next()) { - CompactionInfo info = new CompactionInfo(); - info.id = rs.getLong(1); - info.dbname = rs.getString(2); - info.tableName = rs.getString(3); - info.partName = rs.getString(4); - switch (rs.getString(5).charAt(0)) { - case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; - case MINOR_TYPE: info.type = CompactionType.MINOR; break; - default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + while (rs.next()) { + CompactionInfo info = new CompactionInfo(); + info.id = rs.getLong(1); + info.dbname = rs.getString(2); + info.tableName = rs.getString(3); + info.partName = rs.getString(4); + switch (rs.getString(5).charAt(0)) { + case MAJOR_TYPE: info.type = CompactionType.MAJOR; break; + case MINOR_TYPE: info.type = CompactionType.MINOR; break; + default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); + } + info.runAs = rs.getString(6); + rc.add(info); } - info.runAs = rs.getString(6); - rc.add(info); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - return rc; - } catch (SQLException e) { - LOG.error("Unable to select next element for cleaning, " + e.getMessage()); - try { LOG.debug("Going to rollback"); dbConn.rollback(); - } catch (SQLException e1) { - } - throw new MetaException("Unable to connect to transaction database " + + return rc; + } catch (SQLException e) { + LOG.error("Unable to select next element for cleaning, " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "findReadyToClean"); + throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); - } finally { - closeDbConn(dbConn); - closeStmt(stmt); + } finally { + closeDbConn(dbConn); + closeStmt(stmt); + } + } catch (RetryException e) { + return findReadyToClean(); } } @@ -303,9 +301,10 @@ public void markCompacted(CompactionInfo info) throws MetaException { */ public void markCleaned(CompactionInfo info) throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); @@ -318,20 +317,20 @@ public void markCleaned(CompactionInfo info) throws MetaException { // Remove entries from completed_txn_components as well, so we don't start looking there // again. s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " + - "ctc_table = '" + info.tableName + "'"; + "ctc_table = '" + info.tableName + "'"; if (info.partName != null) { s += " and ctc_partition = '" + info.partName + "'"; } LOG.debug("Going to execute update <" + s + ">"); if (stmt.executeUpdate(s) < 1) { LOG.error("Expected to remove at least one row from completed_txn_components when " + - "marking compaction entry as clean!"); + "marking compaction entry as clean!"); } s = "select txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" + - TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + - info.tableName + "'"; + TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" + + info.tableName + "'"; if (info.partName != null) s += " and tc_partition = '" + info.partName + "'"; LOG.debug("Going to execute update <" + s + ">"); ResultSet rs = stmt.executeQuery(s); @@ -371,23 +370,18 @@ public void markCleaned(CompactionInfo info) throws MetaException { LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.error("Unable to delete from compaction queue " + e.getMessage()); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "markCleaned"); + LOG.error("Unable to delete from compaction queue " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "markCleaned"); throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); closeStmt(stmt); } - } catch (DeadlockException e) { + } catch (RetryException e) { markCleaned(info); - } finally { - deadlockCnt = 0; } } @@ -396,13 +390,14 @@ public void markCleaned(CompactionInfo info) throws MetaException { */ public void cleanEmptyAbortedTxns() throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); String s = "select txn_id from TXNS where " + - "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + - "txn_state = '" + TXN_ABORTED + "'"; + "txn_id not in (select tc_txnid from TXN_COMPONENTS) and " + + "txn_state = '" + TXN_ABORTED + "'"; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); Set txnids = new HashSet(); @@ -425,21 +420,16 @@ public void cleanEmptyAbortedTxns() throws MetaException { } catch (SQLException e) { LOG.error("Unable to delete from txns table " + e.getMessage()); LOG.debug("Going to rollback"); - try { - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "cleanEmptyAbortedTxns"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "cleanEmptyAbortedTxns"); throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); closeStmt(stmt); } - } catch (DeadlockException e) { + } catch (RetryException e) { cleanEmptyAbortedTxns(); - } finally { - deadlockCnt = 0; } } @@ -454,13 +444,14 @@ public void cleanEmptyAbortedTxns() throws MetaException { */ public void revokeFromLocalWorkers(String hostname) throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" - + hostname + "%'"; + + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_worker_id like '" + + hostname + "%'"; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died // with nothing assigned to them. @@ -468,24 +459,19 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.error("Unable to change dead worker's records back to initiated state " + - e.getMessage()); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "revokeFromLocalWorkers"); + LOG.error("Unable to change dead worker's records back to initiated state " + + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "revokeFromLocalWorkers"); throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); closeStmt(stmt); } - } catch (DeadlockException e) { + } catch (RetryException e) { revokeFromLocalWorkers(hostname); - } finally { - deadlockCnt = 0; } } @@ -500,14 +486,15 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { */ public void revokeTimedoutWorkers(long timeout) throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); - long latestValidStart = getDbTime(dbConn) - timeout; + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + long latestValidStart = getDbTime(dbConn) - timeout; stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" - + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " - + latestValidStart; + + INITIATED_STATE+ "' where cq_state = '" + WORKING_STATE + "' and cq_start < " + + latestValidStart; LOG.debug("Going to execute update <" + s + ">"); // It isn't an error if the following returns no rows, as the local workers could have died // with nothing assigned to them. @@ -515,24 +502,19 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.error("Unable to change dead worker's records back to initiated state " + - e.getMessage()); - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "revokeTimedoutWorkers"); + LOG.error("Unable to change dead worker's records back to initiated state " + + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "revokeTimedoutWorkers"); throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); closeStmt(stmt); } - } catch (DeadlockException e) { + } catch (RetryException e) { revokeTimedoutWorkers(timeout); - } finally { - deadlockCnt = 0; } } @@ -543,53 +525,55 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { * @throws MetaException */ public List findColumnsWithStats(CompactionInfo ci) throws MetaException { - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Connection dbConn = null; Statement stmt = null; ResultSet rs = null; try { - String quote = getIdentifierQuoteString(dbConn); - stmt = dbConn.createStatement(); - StringBuilder bldr = new StringBuilder(); - bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote) + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + String quote = getIdentifierQuoteString(dbConn); + stmt = dbConn.createStatement(); + StringBuilder bldr = new StringBuilder(); + bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote) .append(" FROM ") .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS")) - .append(quote) + .append(quote) .append(" WHERE ") .append(quote).append("DB_NAME").append(quote).append(" = '").append(ci.dbname) - .append("' AND ").append(quote).append("TABLE_NAME").append(quote) - .append(" = '").append(ci.tableName).append("'"); - if (ci.partName != null) { - bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '") + .append("' AND ").append(quote).append("TABLE_NAME").append(quote) + .append(" = '").append(ci.tableName).append("'"); + if (ci.partName != null) { + bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '") .append(ci.partName).append("'"); - } - String s = bldr.toString(); + } + String s = bldr.toString(); /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS") + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'" + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/ - LOG.debug("Going to execute <" + s + ">"); - rs = stmt.executeQuery(s); - List columns = new ArrayList(); - while(rs.next()) { - columns.add(rs.getString(1)); - } - LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName + - (ci.partName == null ? "" : "/" + ci.partName)); - dbConn.commit(); - return columns; - } catch (SQLException e) { - try { + LOG.debug("Going to execute <" + s + ">"); + rs = stmt.executeQuery(s); + List columns = new ArrayList(); + while (rs.next()) { + columns.add(rs.getString(1)); + } + LOG.debug("Found columns to update stats: " + columns + " on " + ci.tableName + + (ci.partName == null ? "" : "/" + ci.partName)); + dbConn.commit(); + return columns; + } catch (SQLException e) { LOG.error("Failed to find columns to analyze stats on for " + ci.tableName + - (ci.partName == null ? "" : "/" + ci.partName), e); - dbConn.rollback(); - } catch (SQLException e1) { - //nothing we can do here + (ci.partName == null ? "" : "/" + ci.partName), e); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "findColumnsWithStats"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); } - throw new MetaException("Unable to connect to transaction database " + - StringUtils.stringifyException(e)); - } finally { - close(rs, stmt, dbConn); + } catch (RetryException ex) { + return findColumnsWithStats(ci); } } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 04e65ea..02735cb 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -77,7 +77,7 @@ static final private Log LOG = LogFactory.getLog(TxnHandler.class.getName()); static private DataSource connPool; - private static Boolean lockLock = new Boolean("true"); // Random object to lock on for the lock + private final static Object lockLock = new Object(); // Random object to lock on for the lock // method /** @@ -87,10 +87,13 @@ protected HiveConf conf; protected DatabaseProduct dbProduct; - // Transaction timeout, in milliseconds. + // (End user) Transaction timeout, in milliseconds. private long timeout; private String identifierQuoteString; // quotes to use for quoting tables, where necessary + private final long retryInterval; + private final int retryLimit; + private int retryNum; // DEADLOCK DETECTION AND HANDLING // A note to developers of this class. ALWAYS access HIVE_LOCKS before TXNS to avoid deadlock @@ -125,113 +128,122 @@ public TxnHandler(HiveConf conf) { timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS); deadlockCnt = 0; buildJumpTable(); + retryInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HMSHANDLERINTERVAL, TimeUnit.MILLISECONDS); + retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); + } public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { - // We need to figure out the current transaction number and the list of - // open transactions. To avoid needing a transaction on the underlying - // database we'll look at the current transaction number first. If it - // subsequently shows up in the open list that's ok. - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - Statement stmt = null; try { - stmt = dbConn.createStatement(); - String s = "select ntxn_next - 1 from NEXT_TXN_ID"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + + // We need to figure out the current transaction number and the list of + // open transactions. To avoid needing a transaction on the underlying + // database we'll look at the current transaction number first. If it + // subsequently shows up in the open list that's ok. + Connection dbConn = null; + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly " + "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - if (rs.wasNull()) { - throw new MetaException("Transaction tables not properly " + + } + long hwm = rs.getLong(1); + if (rs.wasNull()) { + throw new MetaException("Transaction tables not properly " + "initialized, null record found in next_txn_id"); - } - - List txnInfo = new ArrayList(); - s = "select txn_id, txn_state, txn_user, txn_host from TXNS"; - LOG.debug("Going to execute query<" + s + ">"); - rs = stmt.executeQuery(s); - while (rs.next()) { - char c = rs.getString(2).charAt(0); - TxnState state; - switch (c) { - case TXN_ABORTED: - state = TxnState.ABORTED; - break; - - case TXN_OPEN: - state = TxnState.OPEN; - break; + } - default: - throw new MetaException("Unexpected transaction state " + c + + List txnInfo = new ArrayList(); + s = "select txn_id, txn_state, txn_user, txn_host from TXNS"; + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + char c = rs.getString(2).charAt(0); + TxnState state; + switch (c) { + case TXN_ABORTED: + state = TxnState.ABORTED; + break; + + case TXN_OPEN: + state = TxnState.OPEN; + break; + + default: + throw new MetaException("Unexpected transaction state " + c + " found in txns table"); + } + txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4))); } - txnInfo.add(new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4))); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - return new GetOpenTxnsInfoResponse(hwm, txnInfo); - } catch (SQLException e) { - try { LOG.debug("Going to rollback"); dbConn.rollback(); - } catch (SQLException e1) { - } - throw new MetaException("Unable to select from transaction database, " + return new GetOpenTxnsInfoResponse(hwm, txnInfo); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getOpenTxnsInfo"); + throw new MetaException("Unable to select from transaction database: " + getMessage(e) + StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + return getOpenTxnsInfo(); } } public GetOpenTxnsResponse getOpenTxns() throws MetaException { - // We need to figure out the current transaction number and the list of - // open transactions. To avoid needing a transaction on the underlying - // database we'll look at the current transaction number first. If it - // subsequently shows up in the open list that's ok. - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - Statement stmt = null; try { - timeOutTxns(dbConn); - stmt = dbConn.createStatement(); - String s = "select ntxn_next - 1 from NEXT_TXN_ID"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + + // We need to figure out the current transaction number and the list of + // open transactions. To avoid needing a transaction on the underlying + // database we'll look at the current transaction number first. If it + // subsequently shows up in the open list that's ok. + Connection dbConn = null; + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + timeOutTxns(dbConn); + stmt = dbConn.createStatement(); + String s = "select ntxn_next - 1 from NEXT_TXN_ID"; + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly " + "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - if (rs.wasNull()) { - throw new MetaException("Transaction tables not properly " + + } + long hwm = rs.getLong(1); + if (rs.wasNull()) { + throw new MetaException("Transaction tables not properly " + "initialized, null record found in next_txn_id"); - } + } - Set openList = new HashSet(); - s = "select txn_id from TXNS"; - LOG.debug("Going to execute query<" + s + ">"); - rs = stmt.executeQuery(s); - while (rs.next()) { - openList.add(rs.getLong(1)); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - return new GetOpenTxnsResponse(hwm, openList); - } catch (SQLException e) { - try { + Set openList = new HashSet(); + s = "select txn_id from TXNS"; + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + openList.add(rs.getLong(1)); + } LOG.debug("Going to rollback"); dbConn.rollback(); - } catch (SQLException e1) { - } - throw new MetaException("Unable to select from transaction database, " + return new GetOpenTxnsResponse(hwm, openList); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getOpenTxns"); + throw new MetaException("Unable to select from transaction database, " + StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + } catch (RetryException e) { + return getOpenTxns(); } } @@ -259,12 +271,13 @@ public static ValidTxnList createValidTxnList(GetOpenTxnsResponse txns, long cur public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { int numTxns = rqst.getNum_txns(); try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); // Make sure the user has not requested an insane amount of txns. int maxTxns = HiveConf.getIntVar(conf, - HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); + HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH); if (numTxns > maxTxns) numTxns = maxTxns; stmt = dbConn.createStatement(); @@ -273,7 +286,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { throw new MetaException("Transaction database not properly " + - "configured, can't find next transaction id."); + "configured, can't find next transaction id."); } long first = rs.getLong(1); s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); @@ -281,8 +294,8 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { stmt.executeUpdate(s); long now = getDbTime(dbConn); s = "insert into TXNS (txn_id, txn_state, txn_started, " + - "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " + - now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')"; + "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " + + now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')"; LOG.debug("Going to prepare statement <" + s + ">"); PreparedStatement ps = dbConn.prepareStatement(s); List txnIds = new ArrayList(numTxns); @@ -296,30 +309,26 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { dbConn.commit(); return new OpenTxnsResponse(txnIds); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "openTxns"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "openTxns"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { closeStmt(stmt); closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { return openTxns(rqst); - } finally { - deadlockCnt = 0; } } public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException { long txnid = rqst.getTxnid(); try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); List txnids = new ArrayList(1); txnids.add(txnid); if (abortTxns(dbConn, txnids) != 1) { @@ -331,31 +340,27 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "abortTxn"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "abortTxn"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { abortTxn(rqst); - } finally { - deadlockCnt = 0; } } public void commitTxn(CommitTxnRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException { + throws NoSuchTxnException, TxnAbortedException, MetaException { long txnid = rqst.getTxnid(); try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); // Before we do the commit heartbeat the txn. This is slightly odd in that we're going to // commit it, but it does two things. One, it makes sure the transaction is still valid. @@ -367,11 +372,11 @@ public void commitTxn(CommitTxnRequest rqst) // Move the record from txn_components into completed_txn_components so that the compactor // knows where to look to compact. String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " + - "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; + "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid; LOG.debug("Going to execute insert <" + s + ">"); if (stmt.executeUpdate(s) < 1) { LOG.warn("Expected to move at least one record from txn_components to " + - "completed_txn_components when committing txn!"); + "completed_txn_components when committing txn!"); } // Always access TXN_COMPONENTS before HIVE_LOCKS; @@ -388,80 +393,68 @@ public void commitTxn(CommitTxnRequest rqst) LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "commitTxn"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "commitTxn"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { closeStmt(stmt); closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { commitTxn(rqst); - } finally { - deadlockCnt = 0; } } public LockResponse lock(LockRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException { + throws NoSuchTxnException, TxnAbortedException, MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); return lock(dbConn, rqst, true); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "lock"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "lock"); throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { return lock(rqst); - } finally { - deadlockCnt = 0; } } public LockResponse lockNoWait(LockRequest rqst) - throws NoSuchTxnException, TxnAbortedException, MetaException { + throws NoSuchTxnException, TxnAbortedException, MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); return lock(dbConn, rqst, false); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "lockNoWait"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "lockNoWait"); throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { return lockNoWait(rqst); - } finally { - deadlockCnt = 0; } } public LockResponse checkLock(CheckLockRequest rqst) - throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); long extLockId = rqst.getLockid(); // Clean up timed out locks timeOutLocks(dbConn); @@ -474,31 +467,27 @@ public LockResponse checkLock(CheckLockRequest rqst) if (txnid > 0) heartbeatTxn(dbConn, txnid); return checkLock(dbConn, extLockId, true); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "checkLock"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "checkLock"); throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { return checkLock(rqst); - } finally { - deadlockCnt = 0; } } public void unlock(UnlockRequest rqst) - throws NoSuchLockException, TxnOpenException, MetaException { + throws NoSuchLockException, TxnOpenException, MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); // Odd as it seems, we need to heartbeat first because this touches the // lock table and assures that our locks our still valid. If they are // not, this will throw an exception and the heartbeat will fail. @@ -512,8 +501,8 @@ public void unlock(UnlockRequest rqst) LOG.debug("Going to rollback"); dbConn.rollback(); String msg = "Unlocking locks associated with transaction" + - " not permitted. Lockid " + extLockId + " is associated with " + - "transaction " + txnid; + " not permitted. Lockid " + extLockId + " is associated with " + + "transaction " + txnid; LOG.error(msg); throw new TxnOpenException(msg); } @@ -529,97 +518,96 @@ public void unlock(UnlockRequest rqst) LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "unlock"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "unlock"); throw new MetaException("Unable to update transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeStmt(stmt); closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { unlock(rqst); - } finally { - deadlockCnt = 0; } } public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - ShowLocksResponse rsp = new ShowLocksResponse(); - List elems = new ArrayList(); - Statement stmt = null; try { - stmt = dbConn.createStatement(); + Connection dbConn = null; + ShowLocksResponse rsp = new ShowLocksResponse(); + List elems = new ArrayList(); + Statement stmt = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); - String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + + String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " + "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host from HIVE_LOCKS"; - LOG.debug("Doing to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - while (rs.next()) { - ShowLocksResponseElement e = new ShowLocksResponseElement(); - e.setLockid(rs.getLong(1)); - long txnid = rs.getLong(2); - if (!rs.wasNull()) e.setTxnid(txnid); - e.setDbname(rs.getString(3)); - e.setTablename(rs.getString(4)); - String partition = rs.getString(5); - if (partition != null) e.setPartname(partition); - switch (rs.getString(6).charAt(0)) { - case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break; - case LOCK_WAITING: e.setState(LockState.WAITING); break; - default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0)); - } - switch (rs.getString(7).charAt(0)) { - case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break; - case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break; - case LOCK_SHARED: e.setType(LockType.SHARED_READ); break; - default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0)); + LOG.debug("Doing to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + while (rs.next()) { + ShowLocksResponseElement e = new ShowLocksResponseElement(); + e.setLockid(rs.getLong(1)); + long txnid = rs.getLong(2); + if (!rs.wasNull()) e.setTxnid(txnid); + e.setDbname(rs.getString(3)); + e.setTablename(rs.getString(4)); + String partition = rs.getString(5); + if (partition != null) e.setPartname(partition); + switch (rs.getString(6).charAt(0)) { + case LOCK_ACQUIRED: e.setState(LockState.ACQUIRED); break; + case LOCK_WAITING: e.setState(LockState.WAITING); break; + default: throw new MetaException("Unknown lock state " + rs.getString(6).charAt(0)); + } + switch (rs.getString(7).charAt(0)) { + case LOCK_SEMI_SHARED: e.setType(LockType.SHARED_WRITE); break; + case LOCK_EXCLUSIVE: e.setType(LockType.EXCLUSIVE); break; + case LOCK_SHARED: e.setType(LockType.SHARED_READ); break; + default: throw new MetaException("Unknown lock type " + rs.getString(6).charAt(0)); + } + e.setLastheartbeat(rs.getLong(8)); + long acquiredAt = rs.getLong(9); + if (!rs.wasNull()) e.setAcquiredat(acquiredAt); + e.setUser(rs.getString(10)); + e.setHostname(rs.getString(11)); + elems.add(e); } - e.setLastheartbeat(rs.getLong(8)); - long acquiredAt = rs.getLong(9); - if (!rs.wasNull()) e.setAcquiredat(acquiredAt); - e.setUser(rs.getString(10)); - e.setHostname(rs.getString(11)); - elems.add(e); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e) { - throw new MetaException("Unable to select from transaction database " + + LOG.debug("Going to rollback"); + dbConn.rollback(); + } catch (SQLException e) { + checkRetryable(dbConn, e, "showLocks"); + throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + rsp.setLocks(elems); + return rsp; + } catch (RetryException e) { + return showLocks(rqst); } - rsp.setLocks(elems); - return rsp; } public void heartbeat(HeartbeatRequest ids) - throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { + throws NoSuchTxnException, NoSuchLockException, TxnAbortedException, MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); heartbeatLock(dbConn, ids.getLockid()); heartbeatTxn(dbConn, ids.getTxnid()); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "heartbeat"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "heartbeat"); throw new MetaException("Unable to select from transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { heartbeat(ids); } finally { deadlockCnt = 0; @@ -627,15 +615,16 @@ public void heartbeat(HeartbeatRequest ids) } public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst) - throws MetaException { + throws MetaException { try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; HeartbeatTxnRangeResponse rsp = new HeartbeatTxnRangeResponse(); Set nosuch = new HashSet(); Set aborted = new HashSet(); rsp.setNosuch(nosuch); rsp.setAborted(aborted); try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); for (long txn = rqst.getMin(); txn <= rqst.getMax(); txn++) { try { heartbeatTxn(dbConn, txn); @@ -647,18 +636,15 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst } return rsp; } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "heartbeatTxnRange"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "heartbeatTxnRange"); throw new MetaException("Unable to select from transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { return heartbeatTxnRange(rqst); } } @@ -666,9 +652,10 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst public void compact(CompactionRequest rqst) throws MetaException { // Put a compaction request in the queue. try { - Connection dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); + Connection dbConn = null; Statement stmt = null; try { + dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE); stmt = dbConn.createStatement(); // Get the id for the next entry in the queue @@ -679,7 +666,7 @@ public void compact(CompactionRequest rqst) throws MetaException { LOG.debug("Going to rollback"); dbConn.rollback(); throw new MetaException("Transaction tables not properly initiated, " + - "no record found in next_compaction_queue_id"); + "no record found in next_compaction_queue_id"); } long id = rs.getLong(1); s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1); @@ -687,7 +674,7 @@ public void compact(CompactionRequest rqst) throws MetaException { stmt.executeUpdate(s); StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " + - "cq_table, "); + "cq_table, "); String partName = rqst.getPartitionname(); if (partName != null) buf.append("cq_partition, "); buf.append("cq_state, cq_type"); @@ -730,71 +717,69 @@ public void compact(CompactionRequest rqst) throws MetaException { LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { - try { - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e1) { - } - detectDeadlock(dbConn, e, "compact"); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "compact"); throw new MetaException("Unable to select from transaction database " + - StringUtils.stringifyException(e)); + StringUtils.stringifyException(e)); } finally { closeStmt(stmt); closeDbConn(dbConn); } - } catch (DeadlockException e) { + } catch (RetryException e) { compact(rqst); - } finally { - deadlockCnt = 0; } } public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException { ShowCompactResponse response = new ShowCompactResponse(new ArrayList()); - Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Connection dbConn = null; Statement stmt = null; try { - stmt = dbConn.createStatement(); - String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " + + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select cq_database, cq_table, cq_partition, cq_state, cq_type, cq_worker_id, " + "cq_start, cq_run_as from COMPACTION_QUEUE"; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - while (rs.next()) { - ShowCompactResponseElement e = new ShowCompactResponseElement(); - e.setDbname(rs.getString(1)); - e.setTablename(rs.getString(2)); - e.setPartitionname(rs.getString(3)); - switch (rs.getString(4).charAt(0)) { - case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break; - case WORKING_STATE: e.setState(WORKING_RESPONSE); break; - case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break; - default: throw new MetaException("Unexpected compaction state " + rs.getString(4)); - } - switch (rs.getString(5).charAt(0)) { - case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break; - case MINOR_TYPE: e.setType(CompactionType.MINOR); break; - default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); + LOG.debug("Going to execute query <" + s + ">"); + ResultSet rs = stmt.executeQuery(s); + while (rs.next()) { + ShowCompactResponseElement e = new ShowCompactResponseElement(); + e.setDbname(rs.getString(1)); + e.setTablename(rs.getString(2)); + e.setPartitionname(rs.getString(3)); + switch (rs.getString(4).charAt(0)) { + case INITIATED_STATE: e.setState(INITIATED_RESPONSE); break; + case WORKING_STATE: e.setState(WORKING_RESPONSE); break; + case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break; + default: throw new MetaException("Unexpected compaction state " + rs.getString(4)); + } + switch (rs.getString(5).charAt(0)) { + case MAJOR_TYPE: e.setType(CompactionType.MAJOR); break; + case MINOR_TYPE: e.setType(CompactionType.MINOR); break; + default: throw new MetaException("Unexpected compaction type " + rs.getString(5)); + } + e.setWorkerid(rs.getString(6)); + e.setStart(rs.getLong(7)); + e.setRunAs(rs.getString(8)); + response.addToCompacts(e); } - e.setWorkerid(rs.getString(6)); - e.setStart(rs.getLong(7)); - e.setRunAs(rs.getString(8)); - response.addToCompacts(e); - } - LOG.debug("Going to rollback"); - dbConn.rollback(); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - try { + LOG.debug("Going to rollback"); dbConn.rollback(); - } catch (SQLException e1) { - } - throw new MetaException("Unable to select from transaction database " + + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "showCompact"); + throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); - } finally { - closeStmt(stmt); - closeDbConn(dbConn); + } finally { + closeStmt(stmt); + closeDbConn(dbConn); + } + return response; + } catch (RetryException e) { + return showCompact(rqst); } - return response; } /** @@ -828,7 +813,7 @@ long setTimeout(long milliseconds) { return previous_timeout; } - protected class DeadlockException extends Exception { + protected class RetryException extends Exception { } @@ -839,26 +824,28 @@ long setTimeout(long milliseconds) { * @return db connection * @throws MetaException if the connection cannot be obtained */ - protected Connection getDbConn(int isolationLevel) throws MetaException { + protected Connection getDbConn(int isolationLevel) throws SQLException { + Connection dbConn = connPool.getConnection(); + dbConn.setAutoCommit(false); + dbConn.setTransactionIsolation(isolationLevel); + return dbConn; + } + + void rollbackDBConn(Connection dbConn) { try { - Connection dbConn = connPool.getConnection(); - dbConn.setAutoCommit(false); - dbConn.setTransactionIsolation(isolationLevel); - return dbConn; + if (dbConn != null) dbConn.rollback(); } catch (SQLException e) { - String msg = "Unable to get jdbc connection from pool, " + e.getMessage(); - throw new MetaException(msg); + LOG.warn("Failed to rollback db connection " + getMessage(e)); } } - protected void closeDbConn(Connection dbConn) { try { if (dbConn != null) dbConn.close(); } catch (SQLException e) { - LOG.warn("Failed to close db connection " + e.getMessage()); + LOG.warn("Failed to close db connection " + getMessage(e)); } } - + /** * Close statement instance. * @param stmt statement instance. @@ -867,7 +854,7 @@ protected void closeStmt(Statement stmt) { try { if (stmt != null) stmt.close(); } catch (SQLException e) { - LOG.warn("Failed to close statement " + e.getMessage()); + LOG.warn("Failed to close statement " + getMessage(e)); } } @@ -882,7 +869,7 @@ void close(ResultSet rs) { } } catch(SQLException ex) { - LOG.warn("Failed to close statement " + ex.getMessage()); + LOG.warn("Failed to close statement " + getMessage(ex)); } } @@ -895,18 +882,18 @@ void close(ResultSet rs, Statement stmt, Connection dbConn) { closeDbConn(dbConn); } /** - * Determine if an exception was a deadlock. Unfortunately there is no standard way to do + * Determine if an exception was such that it makse sense to retry. Unfortunately there is no standard way to do * this, so we have to inspect the error messages and catch the telltale signs for each * different database. * @param conn database connection * @param e exception that was thrown. * @param caller name of the method calling this - * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.DeadlockException when deadlock + * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.RetryException when deadlock * detected and retry count has not been exceeded. */ - protected void detectDeadlock(Connection conn, + protected void checkRetryable(Connection conn, SQLException e, - String caller) throws DeadlockException, MetaException { + String caller) throws RetryException, MetaException { // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected() // to test these changes. @@ -919,19 +906,41 @@ protected void detectDeadlock(Connection conn, determineDatabaseProduct(conn); } if (e instanceof SQLTransactionRollbackException || - ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES || - dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) || - (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) || - (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected") - || e.getMessage().contains("can't serialize access for this transaction")))) { + ((dbProduct == DatabaseProduct.MYSQL || dbProduct == DatabaseProduct.POSTGRES || + dbProduct == DatabaseProduct.SQLSERVER) && e.getSQLState().equals("40001")) || + (dbProduct == DatabaseProduct.POSTGRES && e.getSQLState().equals("40P01")) || + (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected") + || e.getMessage().contains("can't serialize access for this transaction")))) { if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) { LOG.warn("Deadlock detected in " + caller + ", trying again."); - throw new DeadlockException(); + throw new RetryException(); } else { LOG.error("Too many repeated deadlocks in " + caller + ", giving up."); deadlockCnt = 0; } } + else if(isRetryable(e)) { + //in MSSQL this means Communication Link Failure + if(retryNum++ < retryLimit) { + try { + Thread.sleep(retryInterval); + } + catch(InterruptedException ex) { + // + } + LOG.warn("Retryable error detected in " + caller + ", trying again: " + getMessage(e)); + throw new RetryException(); + } + else { + LOG.error("Fatal error. Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e)); + retryNum = 0; + } + } + else { + //if here, we got something that will propagate the error (rather than retry), so reset counters + deadlockCnt = 0; + retryNum = 0; + } } /** @@ -1073,10 +1082,10 @@ public boolean equals(Object other) { @Override public String toString() { return "extLockId:" + Long.toString(extLockId) + " intLockId:" + - intLockId + " txnId:" + Long.toString - (txnId) + " db:" + db + " table:" + table + " partition:" + - partition + " state:" + (state == null ? "null" : state.toString()) - + " type:" + (type == null ? "null" : type.toString()); + intLockId + " txnId:" + Long.toString + (txnId) + " db:" + db + " table:" + table + " partition:" + + partition + " state:" + (state == null ? "null" : state.toString()) + + " type:" + (type == null ? "null" : type.toString()); } } @@ -1088,11 +1097,11 @@ public boolean equals(Object other) { public int compare(LockInfo info1, LockInfo info2) { // We sort by state (acquired vs waiting) and then by extLockId. if (info1.state == LockState.ACQUIRED && - info2.state != LockState .ACQUIRED) { + info2.state != LockState .ACQUIRED) { return -1; } if (info1.state != LockState.ACQUIRED && - info2.state == LockState .ACQUIRED) { + info2.state == LockState .ACQUIRED) { return 1; } if (info1.extLockId < info2.extLockId) { @@ -1124,7 +1133,7 @@ public int compare(LockInfo info1, LockInfo info2) { private void checkQFileTestHack() { boolean hackOn = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST) || - HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST); + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST); if (hackOn) { LOG.info("Hacking in canned values for transaction manager"); // Set up the transaction/locking db in the derby metastore @@ -1135,7 +1144,7 @@ private void checkQFileTestHack() { // We may have already created the tables and thus don't need to redo it. if (!e.getMessage().contains("already exists")) { throw new RuntimeException("Unable to set up transaction database for" + - " testing: " + e.getMessage()); + " testing: " + e.getMessage()); } } } @@ -1153,7 +1162,7 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException int updateCnt = 0; try { stmt = dbConn.createStatement(); - + // delete from HIVE_LOCKS first, we always access HIVE_LOCKS before TXNS StringBuilder buf = new StringBuilder("delete from HIVE_LOCKS where hl_txnid in ("); boolean first = true; @@ -1165,7 +1174,7 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException buf.append(')'); LOG.debug("Going to execute update <" + buf.toString() + ">"); stmt.executeUpdate(buf.toString()); - + buf = new StringBuilder("update TXNS set txn_state = '" + TXN_ABORTED + "' where txn_id in ("); first = true; for (Long id : txnids) { @@ -1176,7 +1185,7 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException buf.append(')'); LOG.debug("Going to execute update <" + buf.toString() + ">"); updateCnt = stmt.executeUpdate(buf.toString()); - + LOG.debug("Going to commit"); dbConn.commit(); } finally { @@ -1202,7 +1211,7 @@ private int abortTxns(Connection dbConn, List txnids) throws SQLException * @throws TxnAbortedException */ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) - throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException { + throws NoSuchTxnException, TxnAbortedException, MetaException, SQLException { // We want to minimize the number of concurrent lock requests being issued. If we do not we // get a large number of deadlocks in the database, since this method has to both clean // timedout locks and insert new locks. This synchronization barrier will not eliminiate all @@ -1227,7 +1236,7 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) LOG.debug("Going to rollback"); dbConn.rollback(); throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_lock_id"); + "initialized, no record found in next_lock_id"); } long extLockId = rs.getLong(1); s = "update NEXT_LOCK_ID set nl_next = " + (extLockId + 1); @@ -1252,8 +1261,8 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) s = "insert into TXN_COMPONENTS " + "(tc_txnid, tc_database, tc_table, tc_partition) " + "values (" + txnid + ", '" + dbName + "', " + - (tblName == null ? "null" : "'" + tblName + "'") + ", " + - (partName == null ? "null" : "'" + partName + "'") + ")"; + (tblName == null ? "null" : "'" + tblName + "'") + ", " + + (partName == null ? "null" : "'" + partName + "'") + ")"; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); } @@ -1275,13 +1284,13 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) long now = getDbTime(dbConn); s = "insert into HIVE_LOCKS " + " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + - " values (" + extLockId + ", " + - + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" + - dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" ) - + ", " + (partName == null ? "null" : "'" + partName + "'") + - ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + now + ", '" + - rqst.getUser() + "', '" + rqst.getHostname() + "')"; + "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + + " values (" + extLockId + ", " + + + intLockId + "," + (txnid >= 0 ? txnid : "null") + ", '" + + dbName + "', " + (tblName == null ? "null" : "'" + tblName + "'" ) + + ", " + (partName == null ? "null" : "'" + partName + "'") + + ", '" + LOCK_WAITING + "', " + "'" + lockChar + "', " + now + ", '" + + rqst.getUser() + "', '" + rqst.getHostname() + "')"; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); } @@ -1305,7 +1314,7 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) private LockResponse checkLock(Connection dbConn, long extLockId, boolean alwaysCommit) - throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { + throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId); LockResponse response = new LockResponse(); response.setLockid(extLockId); @@ -1313,8 +1322,8 @@ private LockResponse checkLock(Connection dbConn, LOG.debug("Setting savepoint"); Savepoint save = dbConn.setSavepoint(); StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + - "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + - "hl_lock_type from HIVE_LOCKS where hl_db in ("); + "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + + "hl_lock_type from HIVE_LOCKS where hl_db in ("); Set strings = new HashSet(locksBeingChecked.size()); for (LockInfo info : locksBeingChecked) { @@ -1430,7 +1439,7 @@ private LockResponse checkLock(Connection dbConn, // lock the whole database and we need to check it. Otherwise, // check if they are operating on the same table, if not, move on. if (locks[index].table != null && locks[i].table != null - && !locks[index].table.equals(locks[i].table)) { + && !locks[index].table.equals(locks[i].table)) { continue; } @@ -1438,30 +1447,30 @@ private LockResponse checkLock(Connection dbConn, // lock the whole table and we need to check it. Otherwise, // check if they are operating on the same partition, if not, move on. if (locks[index].partition != null && locks[i].partition != null - && !locks[index].partition.equals(locks[i].partition)) { + && !locks[index].partition.equals(locks[i].partition)) { continue; } // We've found something that matches what we're trying to lock, // so figure out if we can lock it too. switch (jumpTable.get(locks[index].type).get(locks[i].type).get - (locks[i].state)) { - case ACQUIRE: - acquire(dbConn, stmt, extLockId, info.intLockId); - acquired = true; - break; - case WAIT: - wait(dbConn, save); - if (alwaysCommit) { - // In the case where lockNoWait has been called we don't want to commit because - // it's going to roll everything back. In every other case we want to commit here. - LOG.debug("Going to commit"); - dbConn.commit(); - } - response.setState(LockState.WAITING); - return response; - case KEEP_LOOKING: - continue; + (locks[i].state)) { + case ACQUIRE: + acquire(dbConn, stmt, extLockId, info.intLockId); + acquired = true; + break; + case WAIT: + wait(dbConn, save); + if (alwaysCommit) { + // In the case where lockNoWait has been called we don't want to commit because + // it's going to roll everything back. In every other case we want to commit here. + LOG.debug("Going to commit"); + dbConn.commit(); + } + response.setState(LockState.WAITING); + return response; + case KEEP_LOOKING: + continue; } if (acquired) break; // We've acquired this lock component, // so get out of the loop and look at the next component. @@ -1494,18 +1503,18 @@ private void wait(Connection dbConn, Savepoint save) throws SQLException { } private void acquire(Connection dbConn, Statement stmt, long extLockId, long intLockId) - throws SQLException, NoSuchLockException, MetaException { + throws SQLException, NoSuchLockException, MetaException { long now = getDbTime(dbConn); String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " + - "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " + - extLockId + " and hl_lock_int_id = " + intLockId; + "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " + + extLockId + " and hl_lock_int_id = " + intLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { LOG.debug("Going to rollback"); dbConn.rollback(); throw new NoSuchLockException("No such lock: (" + extLockId + "," + - + intLockId + ")"); + + intLockId + ")"); } // We update the database, but we don't commit because there may be other // locks together with this, and we only want to acquire one if we can @@ -1514,7 +1523,7 @@ private void acquire(Connection dbConn, Statement stmt, long extLockId, long int // Heartbeats on the lock table. This commits, so do not enter it with any state private void heartbeatLock(Connection dbConn, long extLockId) - throws NoSuchLockException, SQLException, MetaException { + throws NoSuchLockException, SQLException, MetaException { // If the lock id is 0, then there are no locks in this heartbeat if (extLockId == 0) return; Statement stmt = null; @@ -1523,7 +1532,7 @@ private void heartbeatLock(Connection dbConn, long extLockId) long now = getDbTime(dbConn); String s = "update HIVE_LOCKS set hl_last_heartbeat = " + - now + " where hl_lock_ext_id = " + extLockId; + now + " where hl_lock_ext_id = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -1540,7 +1549,7 @@ private void heartbeatLock(Connection dbConn, long extLockId) // Heartbeats on the txn table. This commits, so do not enter it with any state private void heartbeatTxn(Connection dbConn, long txnid) - throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException { + throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException { // If the txnid is 0, then there are no transactions in this heartbeat if (txnid == 0) return; Statement stmt = null; @@ -1560,10 +1569,10 @@ private void heartbeatTxn(Connection dbConn, long txnid) LOG.debug("Going to rollback"); dbConn.rollback(); throw new TxnAbortedException("Transaction " + txnid + - " already aborted"); + " already aborted"); } s = "update TXNS set txn_last_heartbeat = " + now + - " where txn_id = " + txnid; + " where txn_id = " + txnid; LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit"); @@ -1575,17 +1584,17 @@ private void heartbeatTxn(Connection dbConn, long txnid) // NEVER call this function without first calling heartbeat(long, long) private long getTxnIdFromLockId(Connection dbConn, long extLockId) - throws NoSuchLockException, MetaException, SQLException { + throws NoSuchLockException, MetaException, SQLException { Statement stmt = null; try { stmt = dbConn.createStatement(); String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " + - extLockId; + extLockId; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); if (!rs.next()) { throw new MetaException("This should never happen! We already " + - "checked the lock existed but now we can't find it!"); + "checked the lock existed but now we can't find it!"); } long txnid = rs.getLong(1); LOG.debug("Return txnid " + (rs.wasNull() ? -1 : txnid)); @@ -1597,13 +1606,13 @@ private long getTxnIdFromLockId(Connection dbConn, long extLockId) // NEVER call this function without first calling heartbeat(long, long) private List getLockInfoFromLockId(Connection dbConn, long extLockId) - throws NoSuchLockException, MetaException, SQLException { + throws NoSuchLockException, MetaException, SQLException { Statement stmt = null; try { stmt = dbConn.createStatement(); String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + - "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + - "hl_lock_ext_id = " + extLockId; + "hl_partition, hl_lock_state, hl_lock_type from HIVE_LOCKS where " + + "hl_lock_ext_id = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); boolean sawAtLeastOne = false; @@ -1614,7 +1623,7 @@ private long getTxnIdFromLockId(Connection dbConn, long extLockId) } if (!sawAtLeastOne) { throw new MetaException("This should never happen! We already " + - "checked the lock existed but now we can't find it!"); + "checked the lock existed but now we can't find it!"); } return ourLockInfo; } finally { @@ -1632,7 +1641,7 @@ private void timeOutLocks(Connection dbConn) throws SQLException, MetaException stmt = dbConn.createStatement(); // Remove any timed out locks from the table. String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + - (now - timeout); + (now - timeout); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); LOG.debug("Going to commit"); @@ -1652,7 +1661,7 @@ private void timeOutTxns(Connection dbConn) throws SQLException, MetaException { stmt = dbConn.createStatement(); // Abort any timed out locks from the table. String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN + - "' and txn_last_heartbeat < " + (now - timeout); + "' and txn_last_heartbeat < " + (now - timeout); LOG.debug("Going to execute query <" + s + ">"); ResultSet rs = stmt.executeQuery(s); List deadTxns = new ArrayList(); @@ -1675,12 +1684,12 @@ private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws S String passwd; try { passwd = ShimLoader.getHadoopShims().getPassword(conf, - HiveConf.ConfVars.METASTOREPWD.varname); + HiveConf.ConfVars.METASTOREPWD.varname); } catch (IOException err) { throw new SQLException("Error getting metastore password", err); } String connectionPooler = HiveConf.getVar(conf, - HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE).toLowerCase(); + HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE).toLowerCase(); if ("bonecp".equals(connectionPooler)) { BoneCPConfig config = new BoneCPConfig(); @@ -1696,22 +1705,22 @@ private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws S // This doesn't get used, but it's still necessary, see // http://svn.apache.org/viewvc/commons/proper/dbcp/branches/DBCP_1_4_x_BRANCH/doc/ManualPoolingDataSourceExample.java?view=markup PoolableConnectionFactory poolConnFactory = - new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true); + new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true); connPool = new PoolingDataSource(objectPool); } else { throw new RuntimeException("Unknown JDBC connection pooling " + connectionPooler); } } - private static synchronized void buildJumpTable() { + private static synchronized void buildJumpTable() { if (jumpTable != null) return; jumpTable = - new HashMap>>(3); + new HashMap>>(3); // SR: Lock we are trying to acquire is shared read Map> m = - new HashMap>(3); + new HashMap>(3); jumpTable.put(LockType.SHARED_READ, m); // SR.SR: Lock we are examining is shared read @@ -1743,7 +1752,7 @@ private static synchronized void buildJumpTable() { // that something is blocking it that would not block a read. m2.put(LockState.WAITING, LockAction.KEEP_LOOKING); - // SR.E: Lock we are examining is exclusive + // SR.E: Lock we are examining is exclusive m2 = new HashMap(2); m.put(LockType.EXCLUSIVE, m2); @@ -1777,7 +1786,7 @@ private static synchronized void buildJumpTable() { m2.put(LockState.ACQUIRED, LockAction.WAIT); m2.put(LockState.WAITING, LockAction.WAIT); - // SW.E: Lock we are examining is exclusive + // SW.E: Lock we are examining is exclusive m2 = new HashMap(2); m.put(LockType.EXCLUSIVE, m2); @@ -1805,7 +1814,7 @@ private static synchronized void buildJumpTable() { m2.put(LockState.ACQUIRED, LockAction.WAIT); m2.put(LockState.WAITING, LockAction.WAIT); - // E.E: Lock we are examining is exclusive + // E.E: Lock we are examining is exclusive m2 = new HashMap(2); m.put(LockType.EXCLUSIVE, m2); @@ -1813,4 +1822,20 @@ private static synchronized void buildJumpTable() { m2.put(LockState.ACQUIRED, LockAction.WAIT); m2.put(LockState.WAITING, LockAction.WAIT); } + /** + * Returns true if {@code ex} should be retried + */ + private static boolean isRetryable(Exception ex) { + if(ex instanceof SQLException) { + SQLException sqlException = (SQLException)ex; + if("08S01".equalsIgnoreCase(sqlException.getSQLState())) { + //in MSSQL this means Communication Link Failure + return true; + } + } + return false; + } + private static String getMessage(SQLException ex) { + return ex.getMessage() + "(SQLState=" + ex.getSQLState() + ",ErrorCode=" + ex.getErrorCode() + ")"; + } } diff --git metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index e1f1f49..e85ea34 100644 --- metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -1124,11 +1124,11 @@ public void run() { LOG.debug("no exception, no deadlock"); } catch (SQLException e) { try { - txnHandler.detectDeadlock(conn1, e, "thread t1"); + txnHandler.checkRetryable(conn1, e, "thread t1"); LOG.debug("Got an exception, but not a deadlock, SQLState is " + e.getSQLState() + " class of exception is " + e.getClass().getName() + " msg is <" + e.getMessage() + ">"); - } catch (TxnHandler.DeadlockException de) { + } catch (TxnHandler.RetryException de) { LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + "exception is " + e.getClass().getName() + " msg is <" + e .getMessage() + ">"); @@ -1154,11 +1154,11 @@ public void run() { LOG.debug("no exception, no deadlock"); } catch (SQLException e) { try { - txnHandler.detectDeadlock(conn2, e, "thread t2"); + txnHandler.checkRetryable(conn2, e, "thread t2"); LOG.debug("Got an exception, but not a deadlock, SQLState is " + e.getSQLState() + " class of exception is " + e.getClass().getName() + " msg is <" + e.getMessage() + ">"); - } catch (TxnHandler.DeadlockException de) { + } catch (TxnHandler.RetryException de) { LOG.debug("Forced a deadlock, SQLState is " + e.getSQLState() + " class of " + "exception is " + e.getClass().getName() + " msg is <" + e .getMessage() + ">");