diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 98c6372..de40f19 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1660,6 +1660,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " of the lock manager is dumped to log file. This is for debugging. See also " + "hive.lock.numretries and hive.lock.sleep.between.retries."), + HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 10000, "Maximum number of open transactions. If \n" + + "current open transactions reach this limit, future open transaction requests will be \n" + + "rejected, until this number goes below the limit."), + HIVE_COUNT_OPEN_TXNS_INTERVAL("hive.count.open.txns.interval", "10s", + new TimeValidator(TimeUnit.SECONDS), "Time in seconds between checks to count open txns"), + HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000, "Maximum number of transactions that can be fetched in one call to open_txns().\n" + "This controls how many transactions streaming agents such as Flume or Storm open\n" + diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index c9fadad..bb82f0d 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -6628,6 +6628,7 @@ private static void startHouseKeeperService(HiveConf conf) throws Exception { } startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService")); startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService")); + startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService")); } private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception { //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop() diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 53d2bb4..ea24936 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -44,6 +44,7 @@ import java.sql.*; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; /** @@ -117,6 +118,13 @@ static private DataSource connPool; static private boolean doRetryOnConnPool = false; + // Maximum number of open transactions that's allowed + private int maxOpenTxns; + // Current number of open txns + private static AtomicLong numOpenTxns = new AtomicLong(0); + // Whether number of open transactions reaches the threshold + private boolean tooManyOpenTxns = false; + /** * Number of consecutive deadlocks we have seen */ @@ -180,6 +188,7 @@ public void setConf(HiveConf conf) { TimeUnit.MILLISECONDS); retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); deadlockRetryInterval = retryInterval / 10; + maxOpenTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS); } public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { @@ -307,6 +316,17 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { } } public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { + if (!tooManyOpenTxns && numOpenTxns.longValue() >= maxOpenTxns) { + tooManyOpenTxns = true; + } + if (tooManyOpenTxns) { + if (numOpenTxns.longValue() < maxOpenTxns * 0.9) { + tooManyOpenTxns = false; + } else { + throw new MetaException("Current open transactions reach the upper limit: " + maxOpenTxns); + } + } + int numTxns = rqst.getNum_txns(); try { Connection dbConn = null; @@ -2396,6 +2416,31 @@ public void performTimeOuts() { } } + public void countOpenTxns() { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select count(*) from TXNS where txn_state = '" + TXN_OPEN + "'"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.debug("Transaction database not properly configured, " + + "can't find txn_state from TXNS."); + } else { + numOpenTxns = new AtomicLong(rs.getLong(1)); + } + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + LOG.debug("Failed to update number of open transactions"); + } finally { + close(rs, stmt, dbConn); + } + } + private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException { if (connPool != null) return; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 6d738b5..39c0589 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -76,6 +76,12 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException; /** + * Get the count for open transactions. + * @throws MetaException + */ + public void countOpenTxns() throws MetaException; + + /** * Open a set of transactions * @param rqst request to open transactions * @return information on opened transactions diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java new file mode 100644 index 0000000..2532cac --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Background running thread, periodically updating number of open transactions. + * Runs inside Hive Metastore Service. + */ +public class AcidOpenTxnsCounterService extends HouseKeeperServiceBase { + private static final Logger LOG = LoggerFactory.getLogger(AcidOpenTxnsCounterService.class); + + @Override + protected long getStartDelayMs() { + return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_COUNT_OPEN_TXNS_INTERVAL, TimeUnit.MILLISECONDS); + } + @Override + protected long getIntervalMs() { + return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_COUNT_OPEN_TXNS_INTERVAL, TimeUnit.MILLISECONDS); + } + @Override + protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) { + return new OpenTxnsCounter(hiveConf, isAliveCounter); + } + + @Override + public String getServiceDescription() { + return "Count number of open transactions"; + } + + private static final class OpenTxnsCounter implements Runnable { + private final TxnStore txnHandler; + private final AtomicInteger isAliveCounter; + private OpenTxnsCounter(HiveConf hiveConf, AtomicInteger isAliveCounter) { + txnHandler = TxnUtils.getTxnStore(hiveConf); + this.isAliveCounter = isAliveCounter; + } + + @Override + public void run() { + try { + long startTime = System.currentTimeMillis(); + txnHandler.countOpenTxns(); + int count = isAliveCounter.incrementAndGet(); + LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count); + } + catch(Throwable t) { + LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); + } + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 9b00435..f4c1416 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.metastore.api.CompactionRequest; import org.apache.hadoop.hive.metastore.api.CompactionType; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; @@ -39,6 +40,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService; +import org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService; import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; import org.apache.hadoop.hive.ql.txn.compactor.Initiator; import org.apache.hadoop.hive.ql.txn.compactor.Worker; @@ -672,6 +674,33 @@ public void writeBetweenWorkerAndCleaner() throws Exception { Assert.assertEquals("", expected, runStatementOnDriver("select a,b from " + tblName + " order by a")); } + + @Test + public void testOpenTxnsCounter() throws Exception { + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS, 3); + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_COUNT_OPEN_TXNS_INTERVAL, 100, TimeUnit.MILLISECONDS); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost")); + + // This will update current number of open txns to 3 + AcidOpenTxnsCounterService openTxnsCounterService = new AcidOpenTxnsCounterService(); + openTxnsCounterService.start(hiveConf); + while (openTxnsCounterService.getIsAliveCounter() <= Integer.MIN_VALUE) { + Thread.sleep(100); + } + openTxnsCounterService.stop(); + + MetaException exception = null; + // This should fail once it finds out the threshold has been reached + try { + txnHandler.openTxns(new OpenTxnRequest(1, "you", "localhost")); + } catch (MetaException e) { + exception = e; + } + Assert.assertNotNull("Opening new transaction shouldn't be allowed", exception); + Assert.assertTrue(exception.getMessage().equals("Current open transactions reach the upper limit: 3")); + } + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order