diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index 939df3f..1dae7b9 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -73,4 +73,13 @@ public boolean isMajorCompaction() { public int compareTo(CompactionInfo o) { return getFullPartitionName().compareTo(o.getFullPartitionName()); } + public String toString() { + return "id:" + id + "," + + "dbname:" + dbname + "," + + "tableName:" + tableName + "," + + "partName:" + partName + "," + + "type:" + type + "," + + "runAs:" + runAs + "," + + "tooManyAborts:" + tooManyAborts; + } } diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 26e72be..328a65c 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -95,7 +95,7 @@ public CompactionTxnHandler(HiveConf conf) { dbConn.rollback(); } catch (SQLException e) { LOG.error("Unable to connect to transaction database " + e.getMessage()); - checkRetryable(dbConn, e, "findPotentialCompactions"); + checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + maxAborted + ")"); } finally { closeDbConn(dbConn); closeStmt(stmt); @@ -133,7 +133,7 @@ public void setRunAs(long cq_id, String user) throws MetaException { LOG.error("Unable to update compaction queue, " + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "setRunAs"); + checkRetryable(dbConn, e, "setRunAs(cq_id:" + cq_id + ",user:" + user +")"); } finally { closeDbConn(dbConn); closeStmt(stmt); @@ -194,7 +194,7 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { LOG.error("Unable to select next element for compaction, " + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "findNextToCompact"); + checkRetryable(dbConn, e, "findNextToCompact(workerId:" + workerId + ")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -232,7 +232,7 @@ public void markCompacted(CompactionInfo info) throws MetaException { LOG.error("Unable to update compaction queue " + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "markCompacted"); + checkRetryable(dbConn, e, "markCompacted(" + info + ")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -374,7 +374,7 @@ public void markCleaned(CompactionInfo info) throws MetaException { LOG.error("Unable to delete from compaction queue " + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "markCleaned"); + checkRetryable(dbConn, e, "markCleaned(" + info + ")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -465,7 +465,7 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "revokeFromLocalWorkers"); + checkRetryable(dbConn, e, "revokeFromLocalWorkers(hostname:" + hostname +")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -508,7 +508,7 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "revokeTimedoutWorkers"); + checkRetryable(dbConn, e, "revokeTimedoutWorkers(timeout:" + timeout + ")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { @@ -565,10 +565,9 @@ public void revokeTimedoutWorkers(long timeout) throws MetaException { dbConn.commit(); return columns; } catch (SQLException e) { - LOG.error("Failed to find columns to analyze stats on for " + ci.tableName + - (ci.partName == null ? "" : "/" + ci.partName), e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "findColumnsWithStats"); + checkRetryable(dbConn, e, "findColumnsWithStats(" + ci.tableName + + (ci.partName == null ? "" : "/" + ci.partName) + ")"); throw new MetaException("Unable to connect to transaction database " + StringUtils.stringifyException(e)); } finally { diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 7c3b55c..fd9c275 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -318,7 +318,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "openTxns"); + checkRetryable(dbConn, e, "openTxns(" + rqst + ")"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -347,7 +347,7 @@ public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaExcept } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "abortTxn"); + checkRetryable(dbConn, e, "abortTxn(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -400,7 +400,7 @@ public void commitTxn(CommitTxnRequest rqst) } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "commitTxn"); + checkRetryable(dbConn, e, "commitTxn(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -423,7 +423,7 @@ public LockResponse lock(LockRequest rqst) } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "lock"); + checkRetryable(dbConn, e, "lock(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -444,7 +444,7 @@ public LockResponse lockNoWait(LockRequest rqst) } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "lockNoWait"); + checkRetryable(dbConn, e, "lockNoWait(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -475,7 +475,7 @@ public LockResponse checkLock(CheckLockRequest rqst) } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "checkLock"); + checkRetryable(dbConn, e, "checkLock(" + rqst + " )"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -526,7 +526,7 @@ public void unlock(UnlockRequest rqst) } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "unlock"); + checkRetryable(dbConn, e, "unlock(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -593,7 +593,7 @@ public ShowLocksResponse showLocks(ShowLocksRequest rqst) throws MetaException { LOG.debug("Going to rollback"); dbConn.rollback(); } catch (SQLException e) { - checkRetryable(dbConn, e, "showLocks"); + checkRetryable(dbConn, e, "showLocks(" + rqst + ")"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -624,7 +624,7 @@ public void heartbeat(HeartbeatRequest ids) } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "heartbeat"); + checkRetryable(dbConn, e, "heartbeat(" + ids + ")"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -661,7 +661,7 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "heartbeatTxnRange"); + checkRetryable(dbConn, e, "heartbeatTxnRange(" + rqst + ")"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -742,7 +742,7 @@ public void compact(CompactionRequest rqst) throws MetaException { } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "compact"); + checkRetryable(dbConn, e, "compact(" + rqst + ")"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -792,7 +792,7 @@ public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaExcep } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "showCompact"); + checkRetryable(dbConn, e, "showCompact(" + rqst + ")"); throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -835,7 +835,7 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "addDynamicPartitions"); + checkRetryable(dbConn, e, "addDynamicPartitions(" + rqst + ")"); throw new MetaException("Unable to insert into from transaction database " + StringUtils.stringifyException(e)); } finally { @@ -955,6 +955,7 @@ void close(ResultSet rs, Statement stmt, Connection dbConn) { * @param caller name of the method calling this * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.RetryException when deadlock * detected and retry count has not been exceeded. + * TODO: make "caller" more elaborate like include lockId for example */ protected void checkRetryable(Connection conn, SQLException e, @@ -977,10 +978,12 @@ protected void checkRetryable(Connection conn, (dbProduct == DatabaseProduct.ORACLE && (e.getMessage().contains("deadlock detected") || e.getMessage().contains("can't serialize access for this transaction")))) { if (deadlockCnt++ < ALLOWED_REPEATED_DEADLOCKS) { - LOG.warn("Deadlock detected in " + caller + ", trying again."); + long waitInterval = deadlockRetryInterval * deadlockCnt; + LOG.warn("Deadlock detected in " + caller + ". Will wait " + waitInterval + + "ms try again up to " + (ALLOWED_REPEATED_DEADLOCKS - deadlockCnt + 1) + " times."); // Pause for a just a bit for retrying to avoid immediately jumping back into the deadlock. try { - Thread.sleep(deadlockRetryInterval * deadlockCnt); + Thread.sleep(waitInterval); } catch (InterruptedException ie) { // NOP } @@ -993,13 +996,14 @@ protected void checkRetryable(Connection conn, else if(isRetryable(e)) { //in MSSQL this means Communication Link Failure if(retryNum++ < retryLimit) { + LOG.warn("Retryable error detected in " + caller + ". Will wait " + retryInterval + + "ms and retry up to " + (retryLimit - retryNum + 1) + " times. Error: " + getMessage(e)); try { Thread.sleep(retryInterval); } catch(InterruptedException ex) { // } - LOG.warn("Retryable error detected in " + caller + ", trying again: " + getMessage(e)); throw new RetryException(); } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index e8c49ef..aa00573 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -79,10 +79,10 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode, */ LockState lock(LockRequest lock, String queryId, boolean isBlocking, List acquiredLocks) throws LockException { try { - LOG.debug("Requesting: queryId=" + queryId + " " + lock); + LOG.info("Requesting: queryId=" + queryId + " " + lock); LockResponse res = client.lock(lock); //link lockId to queryId - LOG.debug("Response " + res); + LOG.info("Response to queryId=" + queryId + " " + res); if(!isBlocking) { if(res.getState() == LockState.WAITING) { return LockState.WAITING; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index ccbac80..f8fff1a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -105,7 +105,7 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB LockRequestBuilder rqstBuilder = new LockRequestBuilder(); //link queryId to txnId - LOG.debug("Setting lock request transaction to " + txnId + " for queryId=" + plan.getQueryId()); + LOG.info("Setting lock request transaction to " + txnId + " for queryId=" + plan.getQueryId()); rqstBuilder.setTransactionId(txnId) .setUser(username);