diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 27a56dd..f288dfa 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1655,6 +1655,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " of the lock manager is dumped to log file. This is for debugging. See also " + "hive.lock.numretries and hive.lock.sleep.between.retries."), + HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 10000, "Maximum number of open transactions. If \n" + + "current open transactions reach this limit, future open transaction request will be \n" + + "rejected, until this number goes below the limit."), + HIVE_COUNT_OPEN_TXNS_INTERVAL("hive.check.open.txns.interval", "10s", + new TimeValidator(TimeUnit.SECONDS), "Time in seconds between checks to count open txns"), + HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000, "Maximum number of transactions that can be fetched in one call to open_txns().\n" + "This controls how many transactions streaming agents such as Flume or Storm open\n" + diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index d4d0162..f7fc678 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -25,6 +25,7 @@ import org.apache.commons.dbcp.PoolableConnectionFactory; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; +import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.dbcp.PoolingDataSource; @@ -42,7 +43,11 @@ import java.io.IOException; import java.sql.*; import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; /** @@ -116,6 +121,13 @@ static private DataSource connPool; static private boolean doRetryOnConnPool = false; + // ExecutorService for checking number of open txns from metastore periodically + private static ScheduledExecutorService checkOpenTxnsExecutorService = Executors + .newScheduledThreadPool(1); + static final int SHUTDOWN_HOOK_PRIORITY = 0; + // Current number of open txns + private static AtomicLong numOpenTxns = new AtomicLong(0); + /** * Number of consecutive deadlocks we have seen */ @@ -147,6 +159,25 @@ // and then they should catch RetryException and call themselves recursively. See commitTxn for an example. public TxnHandler() { + Runnable shutdownRunner = new Runnable() { + @Override + public void run() { + if (checkOpenTxnsExecutorService != null + && !checkOpenTxnsExecutorService.isShutdown() + && !checkOpenTxnsExecutorService.isTerminated()) { + LOG.info("Shutting down OpenTxnsCounter thread pool."); + checkOpenTxnsExecutorService.shutdown(); + } + } + }; + ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY); + + long interval = + conf.getTimeVar(HiveConf.ConfVars.HIVE_COUNT_OPEN_TXNS_INTERVAL, TimeUnit.MILLISECONDS); + checkOpenTxnsExecutorService.scheduleWithFixedDelay(new OpenTxnsCounter(), 0, interval, + TimeUnit.MILLISECONDS); + LOG.info("Started " + OpenTxnsCounter.class.getName() + " with delay/interval = " + + 0 + "/" + 10 + " " + TimeUnit.SECONDS); } /** @@ -306,6 +337,11 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { } } public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { + int maxOpenTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS); + if (numOpenTxns.longValue() >= maxOpenTxns) { + throw new MetaException("Current open transactions reach the upper limit: " + maxOpenTxns); + } + int numTxns = rqst.getNum_txns(); try { Connection dbConn = null; @@ -1579,6 +1615,40 @@ public int compare(LockType t1, LockType t2) { } private enum LockAction {ACQUIRE, WAIT, KEEP_LOOKING} + /** + * Background thread for updating numOpenTxns + */ + private class OpenTxnsCounter implements Runnable { + /** + * Retrieve the number of open transactions and update the count + */ + @Override + public void run() { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select count(*) from TXNS where txn_state = '" + TXN_OPEN + "'"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("Transaction database not properly configured, " + + "can't find txn_state from TXNS."); + } else { + numOpenTxns = new AtomicLong(rs.getLong(1)); + } + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + LOG.debug("Failed to update number of open transactions"); + } finally { + close(rs, stmt, dbConn); + } + } + } + // A jump table to figure out whether to wait, acquire, // or keep looking . Since // java doesn't have function pointers (grumble grumble) we store a