diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index db942b0..6eb7649 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1461,8 +1461,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_UNLOCK_NUMRETRIES("hive.unlock.numretries", 10, "The number of times you want to retry to do one unlock"), HIVE_LOCK_SLEEP_BETWEEN_RETRIES("hive.lock.sleep.between.retries", "60s", - new TimeValidator(TimeUnit.SECONDS), - "The sleep time between various retries"), + new TimeValidator(TimeUnit.SECONDS, 50L, true, Long.MAX_VALUE, false), + "The maximum sleep time between various retries"), HIVE_LOCK_MAPRED_ONLY("hive.lock.mapred.only.operation", false, "This param is to control whether or not only do lock on queries\n" + "that need to execute at least one mapred job."), @@ -1506,6 +1506,10 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "no transactions."), HIVE_TXN_TIMEOUT("hive.txn.timeout", "300s", new TimeValidator(TimeUnit.SECONDS), "time after which transactions are declared aborted if the client has not sent a heartbeat."), + TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT("hive.txn.manager.dump.lock.state.on.acquire.timeout", false, + "Set this to true so that when attempt to acquire a lock on resource times out, the current state" + + " of the lock manager is dumped to log file. This is for debugging. See also " + + "hive.lock.numretries and hive.lock.sleep.between.retries."), HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000, "Maximum number of transactions that can be fetched in one call to open_txns().\n" + diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index ca37bf0..ff86d95 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -518,12 +518,12 @@ public LockResponse checkLock(CheckLockRequest rqst) // Heartbeat on the lockid first, to assure that our lock is still valid. // Then look up the lock info (hopefully in the cache). If these locks // are associated with a transaction then heartbeat on that as well. - Long txnid = getTxnIdFromLockId(dbConn, extLockId); - if(txnid == null) { + LockInfo info = getTxnIdFromLockId(dbConn, extLockId); + if(info == null) { throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); } - if (txnid > 0) { - heartbeatTxn(dbConn, txnid); + if (info.txnId > 0) { + heartbeatTxn(dbConn, info.txnId); } else { heartbeatLock(dbConn, extLockId); @@ -570,28 +570,28 @@ public void unlock(UnlockRequest rqst) dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); //hl_txnid <> 0 means it's associated with a transaction - String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND hl_txnid = 0"; + String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND (hl_txnid = 0 OR" + + " (hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "'))"; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { LOG.debug("Going to rollback"); dbConn.rollback(); - Long txnid = getTxnIdFromLockId(dbConn, extLockId); - if(txnid == null) { - LOG.error("No lock found for unlock(" + rqst + ")"); + LockInfo info = getTxnIdFromLockId(dbConn, extLockId); + if(info == null) { + LOG.error("No lock in " + LOCK_WAITING + " mode found for unlock(" + rqst + ")"); throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); } - if(txnid != 0) { + if(info.txnId != 0) { String msg = "Unlocking locks associated with transaction" + - " not permitted. Lockid " + JavaUtils.lockIdToString(extLockId) + " is associated with " + - "transaction " + JavaUtils.txnIdToString(txnid); + " not permitted. " + info; LOG.error(msg); throw new TxnOpenException(msg); } - if(txnid == 0) { + if(info.txnId == 0) { //we didn't see this lock when running DELETE stmt above but now it showed up //so should "should never happen" happened... - String msg = "Found lock " + JavaUtils.lockIdToString(extLockId) + " with " + JavaUtils.txnIdToString(txnid); + String msg = "Found lock in unexpected state " + info; LOG.error(msg); throw new MetaException(msg); } @@ -1910,22 +1910,23 @@ private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt } } - private Long getTxnIdFromLockId(Connection dbConn, long extLockId) + private LockInfo getTxnIdFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException { Statement stmt = null; ResultSet rs = null; try { stmt = dbConn.createStatement(); - String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " + - extLockId; + String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + + "hl_partition, hl_lock_state, hl_lock_type, hl_txnid from HIVE_LOCKS where " + + "hl_lock_ext_id = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { return null; } - long txnid = rs.getLong(1); - LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + JavaUtils.txnIdToString(txnid)); - return txnid; + LockInfo info = new LockInfo(rs); + LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + JavaUtils.txnIdToString(info.txnId)); + return info; } finally { close(rs); closeStmt(stmt); diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 8a47605..9d9dd53 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -424,6 +424,8 @@ CTAS_LOCATION_NONEMPTY(10304, "CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory."), CTAS_CREATES_VOID_TYPE(10305, "CREATE-TABLE-AS-SELECT creates a VOID type, please use CAST to specify the type, near field: "), TBL_SORTED_NOT_BUCKETED(10306, "Destination table {0} found to be sorted but not bucketed.", true), + //{2} should be lockid + LOCK_ACQUIRE_TIMEDOUT(10307, "Lock acquisition for {0} timed out after {1}ms. {2}", true), //========================== 20000 range starts here ========================// SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 42616ac..3b97e37 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hive.ql.lockmgr; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.JavaUtils; @@ -29,10 +32,14 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.thrift.TException; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; /** * An implementation of HiveLockManager for use with {@link org.apache.hadoop.hive.ql.lockmgr.DbTxnManager}. @@ -44,20 +51,20 @@ static final private String CLASS_NAME = DbLockManager.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - private static final long MAX_SLEEP = 15000; - private HiveLockManagerCtx context; + private long MAX_SLEEP; private Set locks; private IMetaStoreClient client; private long nextSleep = 50; + private final HiveConf conf; - DbLockManager(IMetaStoreClient client) { + DbLockManager(IMetaStoreClient client, HiveConf conf) { locks = new HashSet<>(); this.client = client; + this.conf = conf; } @Override public void setContext(HiveLockManagerCtx ctx) throws LockException { - context = ctx; } @Override @@ -81,6 +88,10 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode, * @return the result of the lock attempt */ LockState lock(LockRequest lock, String queryId, boolean isBlocking, List acquiredLocks) throws LockException { + nextSleep = 50; + //get from conf to pick up changes; make not to set too low and kill the metastore + MAX_SLEEP = Math.max(15000, conf.getTimeVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS)); + int maxNumWaits = Math.max(0, conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES)); try { LOG.info("Requesting: queryId=" + queryId + " " + lock); LockResponse res = client.lock(lock); @@ -91,15 +102,27 @@ LockState lock(LockRequest lock, String queryId, boolean isBlocking, List locks = rsp.getLocks(); + if (locks != null) { + for (ShowLocksResponseElement lock : locks) { + os.writeBytes(Long.toString(lock.getLockid())); + os.write(separator); + os.writeBytes(lock.getDbname()); + os.write(separator); + os.writeBytes((lock.getTablename() == null) ? "NULL" : lock.getTablename()); + os.write(separator); + os.writeBytes((lock.getPartname() == null) ? "NULL" : lock.getPartname()); + os.write(separator); + os.writeBytes(lock.getState().toString()); + os.write(separator); + os.writeBytes(lock.getType().toString()); + os.write(separator); + os.writeBytes((lock.getTxnid() == 0) ? "NULL" : Long.toString(lock.getTxnid())); + os.write(separator); + os.writeBytes(Long.toString(lock.getLastheartbeat())); + os.write(separator); + os.writeBytes((lock.getAcquiredat() == 0) ? "NULL" : Long.toString(lock.getAcquiredat())); + os.write(separator); + os.writeBytes(lock.getUser()); + os.write(separator); + os.writeBytes(lock.getHostname()); + os.write(separator); + os.write(terminator); + } + } + os.flush(); + LOG.info(baos.toString()); + } + catch(IOException ex) { + LOG.error("Dumping lock info for " + preamble + " failed: " + ex.getMessage(), ex); + } + } /** * Used to make another attempt to acquire a lock (in Waiting state) * @param extLockId @@ -259,8 +352,8 @@ public String toString() { /** * Clear the memory of the locks in this object. This won't clear the locks from the database. * It is for use with - * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient).commitTxn} and - * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient).rollbackTxn}. + * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient, org.apache.hadoop.hive.conf.HiveConf)} .commitTxn} and + * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient, org.apache.hadoop.hive.conf.HiveConf)} .rollbackTxn}. */ void clearLocalLockRecords() { locks.clear(); diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 97d2282..552367c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -96,7 +96,7 @@ public long openTxn(String user) throws LockException { public HiveLockManager getLockManager() throws LockException { init(); if (lockMgr == null) { - lockMgr = new DbLockManager(client); + lockMgr = new DbLockManager(client, conf); } return lockMgr; }