diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9e805bd..a74407b 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1461,8 +1461,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HIVE_UNLOCK_NUMRETRIES("hive.unlock.numretries", 10, "The number of times you want to retry to do one unlock"), HIVE_LOCK_SLEEP_BETWEEN_RETRIES("hive.lock.sleep.between.retries", "60s", - new TimeValidator(TimeUnit.SECONDS), - "The sleep time between various retries"), + new TimeValidator(TimeUnit.SECONDS, 0L, false, Long.MAX_VALUE, false), + "The maximum sleep time between various retries"), HIVE_LOCK_MAPRED_ONLY("hive.lock.mapred.only.operation", false, "This param is to control whether or not only do lock on queries\n" + "that need to execute at least one mapred job."), @@ -1506,6 +1506,10 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "no transactions."), HIVE_TXN_TIMEOUT("hive.txn.timeout", "300s", new TimeValidator(TimeUnit.SECONDS), "time after which transactions are declared aborted if the client has not sent a heartbeat."), + TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT("hive.txn.manager.dump.lock.state.on.acquire.timeout", false, + "Set this to true so that when attempt to acquire a lock on resource times out, the current state" + + " of the lock manager is dumped to log file. This is for debugging. See also " + + "hive.lock.numretries and hive.lock.sleep.between.retries."), HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000, "Maximum number of transactions that can be fetched in one call to open_txns().\n" + diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index ca37bf0..3e3019c 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -518,12 +518,12 @@ public LockResponse checkLock(CheckLockRequest rqst) // Heartbeat on the lockid first, to assure that our lock is still valid. // Then look up the lock info (hopefully in the cache). If these locks // are associated with a transaction then heartbeat on that as well. - Long txnid = getTxnIdFromLockId(dbConn, extLockId); - if(txnid == null) { + LockInfo info = getTxnIdFromLockId(dbConn, extLockId); + if(info == null) { throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); } - if (txnid > 0) { - heartbeatTxn(dbConn, txnid); + if (info.txnId > 0) { + heartbeatTxn(dbConn, info.txnId); } else { heartbeatLock(dbConn, extLockId); @@ -570,28 +570,27 @@ public void unlock(UnlockRequest rqst) dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); //hl_txnid <> 0 means it's associated with a transaction - String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND hl_txnid = 0"; + String s = "delete from HIVE_LOCKS where hl_lock_ext_id = " + extLockId + " AND (hl_txnid = 0 OR" + + " (hl_txnid <> 0 AND hl_lock_state = '" + LOCK_WAITING + "'))"; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { LOG.debug("Going to rollback"); dbConn.rollback(); - Long txnid = getTxnIdFromLockId(dbConn, extLockId); - if(txnid == null) { - LOG.error("No lock found for unlock(" + rqst + ")"); + LockInfo info = getTxnIdFromLockId(dbConn, extLockId); + if(info == null) { + LOG.error("No lock in " + LOCK_WAITING + " mode found for unlock(" + rqst + ")"); throw new NoSuchLockException("No such lock " + JavaUtils.lockIdToString(extLockId)); } - if(txnid != 0) { - String msg = "Unlocking locks associated with transaction" + - " not permitted. Lockid " + JavaUtils.lockIdToString(extLockId) + " is associated with " + - "transaction " + JavaUtils.txnIdToString(txnid); + if(info.txnId != 0) { + String msg = "Unlocking locks associated with transaction not permitted. " + info; LOG.error(msg); throw new TxnOpenException(msg); } - if(txnid == 0) { + if(info.txnId == 0) { //we didn't see this lock when running DELETE stmt above but now it showed up //so should "should never happen" happened... - String msg = "Found lock " + JavaUtils.lockIdToString(extLockId) + " with " + JavaUtils.txnIdToString(txnid); + String msg = "Found lock in unexpected state " + info; LOG.error(msg); throw new MetaException(msg); } @@ -1910,22 +1909,23 @@ private static void ensureValidTxn(Connection dbConn, long txnid, Statement stmt } } - private Long getTxnIdFromLockId(Connection dbConn, long extLockId) + private LockInfo getTxnIdFromLockId(Connection dbConn, long extLockId) throws NoSuchLockException, MetaException, SQLException { Statement stmt = null; ResultSet rs = null; try { stmt = dbConn.createStatement(); - String s = "select hl_txnid from HIVE_LOCKS where hl_lock_ext_id = " + - extLockId; + String s = "select hl_lock_ext_id, hl_lock_int_id, hl_db, hl_table, " + + "hl_partition, hl_lock_state, hl_lock_type, hl_txnid from HIVE_LOCKS where " + + "hl_lock_ext_id = " + extLockId; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if (!rs.next()) { return null; } - long txnid = rs.getLong(1); - LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + JavaUtils.txnIdToString(txnid)); - return txnid; + LockInfo info = new LockInfo(rs); + LOG.debug("getTxnIdFromLockId(" + extLockId + ") Return " + JavaUtils.txnIdToString(info.txnId)); + return info; } finally { close(rs); closeStmt(stmt); diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 8a47605..9d9dd53 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -424,6 +424,8 @@ CTAS_LOCATION_NONEMPTY(10304, "CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory."), CTAS_CREATES_VOID_TYPE(10305, "CREATE-TABLE-AS-SELECT creates a VOID type, please use CAST to specify the type, near field: "), TBL_SORTED_NOT_BUCKETED(10306, "Destination table {0} found to be sorted but not bucketed.", true), + //{2} should be lockid + LOCK_ACQUIRE_TIMEDOUT(10307, "Lock acquisition for {0} timed out after {1}ms. {2}", true), //========================== 20000 range starts here ========================// SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index a210b95..7f22c43 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -2521,6 +2521,60 @@ public int compare(HiveLock o1, HiveLock o2) { return 0; } + public static void dumpLockInfo(DataOutputStream os, ShowLocksResponse rsp) throws IOException { + // Write a header + os.writeBytes("Lock ID"); + os.write(separator); + os.writeBytes("Database"); + os.write(separator); + os.writeBytes("Table"); + os.write(separator); + os.writeBytes("Partition"); + os.write(separator); + os.writeBytes("State"); + os.write(separator); + os.writeBytes("Type"); + os.write(separator); + os.writeBytes("Transaction ID"); + os.write(separator); + os.writeBytes("Last Hearbeat"); + os.write(separator); + os.writeBytes("Acquired At"); + os.write(separator); + os.writeBytes("User"); + os.write(separator); + os.writeBytes("Hostname"); + os.write(terminator); + + List locks = rsp.getLocks(); + if (locks != null) { + for (ShowLocksResponseElement lock : locks) { + os.writeBytes(Long.toString(lock.getLockid())); + os.write(separator); + os.writeBytes(lock.getDbname()); + os.write(separator); + os.writeBytes((lock.getTablename() == null) ? "NULL" : lock.getTablename()); + os.write(separator); + os.writeBytes((lock.getPartname() == null) ? "NULL" : lock.getPartname()); + os.write(separator); + os.writeBytes(lock.getState().toString()); + os.write(separator); + os.writeBytes(lock.getType().toString()); + os.write(separator); + os.writeBytes((lock.getTxnid() == 0) ? "NULL" : Long.toString(lock.getTxnid())); + os.write(separator); + os.writeBytes(Long.toString(lock.getLastheartbeat())); + os.write(separator); + os.writeBytes((lock.getAcquiredat() == 0) ? "NULL" : Long.toString(lock.getAcquiredat())); + os.write(separator); + os.writeBytes(lock.getUser()); + os.write(separator); + os.writeBytes(lock.getHostname()); + os.write(separator); + os.write(terminator); + } + } + } private int showLocksNewFormat(ShowLocksDesc showLocks, HiveLockManager lm) throws HiveException { @@ -2535,59 +2589,7 @@ private int showLocksNewFormat(ShowLocksDesc showLocks, HiveLockManager lm) // write the results in the file DataOutputStream os = getOutputStream(showLocks.getResFile()); try { - // Write a header - os.writeBytes("Lock ID"); - os.write(separator); - os.writeBytes("Database"); - os.write(separator); - os.writeBytes("Table"); - os.write(separator); - os.writeBytes("Partition"); - os.write(separator); - os.writeBytes("State"); - os.write(separator); - os.writeBytes("Type"); - os.write(separator); - os.writeBytes("Transaction ID"); - os.write(separator); - os.writeBytes("Last Hearbeat"); - os.write(separator); - os.writeBytes("Acquired At"); - os.write(separator); - os.writeBytes("User"); - os.write(separator); - os.writeBytes("Hostname"); - os.write(terminator); - - List locks = rsp.getLocks(); - if (locks != null) { - for (ShowLocksResponseElement lock : locks) { - os.writeBytes(Long.toString(lock.getLockid())); - os.write(separator); - os.writeBytes(lock.getDbname()); - os.write(separator); - os.writeBytes((lock.getTablename() == null) ? "NULL" : lock.getTablename()); - os.write(separator); - os.writeBytes((lock.getPartname() == null) ? "NULL" : lock.getPartname()); - os.write(separator); - os.writeBytes(lock.getState().toString()); - os.write(separator); - os.writeBytes(lock.getType().toString()); - os.write(separator); - os.writeBytes((lock.getTxnid() == 0) ? "NULL" : Long.toString(lock.getTxnid())); - os.write(separator); - os.writeBytes(Long.toString(lock.getLastheartbeat())); - os.write(separator); - os.writeBytes((lock.getAcquiredat() == 0) ? "NULL" : Long.toString(lock.getAcquiredat())); - os.write(separator); - os.writeBytes(lock.getUser()); - os.write(separator); - os.writeBytes(lock.getHostname()); - os.write(separator); - os.write(terminator); - } - - } + dumpLockInfo(os, rsp); } catch (FileNotFoundException e) { LOG.warn("show function: " + stringifyException(e)); return 1; diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 42616ac..e508416 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -17,22 +17,28 @@ */ package org.apache.hadoop.hive.ql.lockmgr; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.DDLTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.thrift.TException; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; /** * An implementation of HiveLockManager for use with {@link org.apache.hadoop.hive.ql.lockmgr.DbTxnManager}. @@ -44,20 +50,20 @@ static final private String CLASS_NAME = DbLockManager.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - private static final long MAX_SLEEP = 15000; - private HiveLockManagerCtx context; + private long MAX_SLEEP; private Set locks; private IMetaStoreClient client; private long nextSleep = 50; + private final HiveConf conf; - DbLockManager(IMetaStoreClient client) { + DbLockManager(IMetaStoreClient client, HiveConf conf) { locks = new HashSet<>(); this.client = client; + this.conf = conf; } @Override public void setContext(HiveLockManagerCtx ctx) throws LockException { - context = ctx; } @Override @@ -81,6 +87,11 @@ public HiveLock lock(HiveLockObject key, HiveLockMode mode, * @return the result of the lock attempt */ LockState lock(LockRequest lock, String queryId, boolean isBlocking, List acquiredLocks) throws LockException { + Objects.requireNonNull(queryId, "queryId cannot be null"); + nextSleep = 50; + //get from conf to pick up changes; make sure not to set too low and kill the metastore + MAX_SLEEP = Math.max(15000, conf.getTimeVar(HiveConf.ConfVars.HIVE_LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS)); + int maxNumWaits = Math.max(0, conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES)); try { LOG.info("Requesting: queryId=" + queryId + " " + lock); LockResponse res = client.lock(lock); @@ -91,15 +102,27 @@ LockState lock(LockRequest lock, String queryId, boolean isBlocking, List inputs; private final HashSet outputs; + private final String queryId; MockQueryPlan(TestDbTxnManager test) { HashSet r = test.readEntities; HashSet w = test.writeEntities; inputs = (r == null) ? new HashSet() : r; outputs = (w == null) ? new HashSet() : w; + queryId = makeQueryId(); } @Override @@ -395,6 +397,10 @@ public void tearDown() throws Exception { public HashSet getOutputs() { return outputs; } + @Override + public String getQueryId() { + return queryId; + } } private Table newTable(boolean isPartitioned) { diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index c6a7fcb..200473b 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.junit.After; @@ -211,6 +211,34 @@ public void updateSelectUpdate() throws Exception { checkCmdOnDriver(cpr); } + @Test + public void testLockRetryLimit() throws Exception { + conf.setIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES, 2); + conf.setBoolVar(HiveConf.ConfVars.TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT, true); + HiveTxnManager otherTxnMgr = new DbTxnManager(); + ((DbTxnManager)otherTxnMgr).setHiveConf(conf); + CommandProcessorResponse cpr = driver.run("create table T9(a int)"); + checkCmdOnDriver(cpr); + cpr = driver.compileAndRespond("select * from T9"); + checkCmdOnDriver(cpr); + txnMgr.acquireLocks(driver.getPlan(), ctx, "Vincent Vega"); + List locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T9", null, locks.get(0)); + + cpr = driver.compileAndRespond("drop table T9"); + checkCmdOnDriver(cpr); + try { + otherTxnMgr.acquireLocks(driver.getPlan(), ctx, "Winston Winnfield"); + } + catch(LockException ex) { + Assert.assertEquals("Got wrong lock exception", ErrorMsg.LOCK_ACQUIRE_TIMEDOUT, ex.getCanonicalErrorMsg()); + } + locks = getLocks(txnMgr); + Assert.assertEquals("Unexpected lock count", 1, locks.size()); + checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T9", null, locks.get(0)); + otherTxnMgr.closeTxnManager(); + } private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) { Assert.assertEquals(l.toString(),l.getType(), type); @@ -226,6 +254,9 @@ private String normalizeCase(String s) { return s == null ? null : s.toLowerCase(); } private List getLocks() throws Exception { + return getLocks(this.txnMgr); + } + private List getLocks(HiveTxnManager txnMgr) throws Exception { ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks(); return rsp.getLocks(); }