diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5360ed4..9a7706b 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1671,6 +1671,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " of the lock manager is dumped to log file. This is for debugging. See also " + "hive.lock.numretries and hive.lock.sleep.between.retries."), + HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" + + "current open transactions reach this limit, future open transaction requests will be \n" + + "rejected, until this number goes below the limit."), + HIVE_COUNT_OPEN_TXNS_INTERVAL("hive.count.open.txns.interval", "1s", + new TimeValidator(TimeUnit.SECONDS), "Time in seconds between checks to count open txns"), + HIVE_TXN_MAX_OPEN_BATCH("hive.txn.max.open.batch", 1000, "Maximum number of transactions that can be fetched in one call to open_txns().\n" + "This controls how many transactions streaming agents such as Flume or Storm open\n" + diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index ed2057a..4c04c88 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -6719,6 +6719,7 @@ private static void startHouseKeeperService(HiveConf conf) throws Exception { } startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService")); startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService")); + startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService")); } private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception { //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop() diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index df6591f..246f092 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.tools.ant.taskdefs.Retry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.dbcp.PoolingDataSource; @@ -48,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; @@ -122,6 +124,13 @@ static private DataSource connPool; static private boolean doRetryOnConnPool = false; + // Maximum number of open transactions that's allowed + private static int maxOpenTxns = 0; + // Current number of open txns + private static long numOpenTxns = 0; + // Whether number of open transactions reaches the threshold + private static boolean tooManyOpenTxns = false; + /** * Number of consecutive deadlocks we have seen */ @@ -191,6 +200,7 @@ public void setConf(HiveConf conf) { TimeUnit.MILLISECONDS); retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); deadlockRetryInterval = retryInterval / 10; + maxOpenTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS); } public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException { @@ -318,6 +328,20 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException { } } public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { + if (!tooManyOpenTxns && numOpenTxns >= maxOpenTxns) { + tooManyOpenTxns = true; + } + if (tooManyOpenTxns) { + if (numOpenTxns < maxOpenTxns * 0.9) { + tooManyOpenTxns = false; + } else { + LOG.warn("Maximum allowed number of open transactions (" + maxOpenTxns + ") has been " + + "reached. Current number of open transactions: " + numOpenTxns); + throw new MetaException("Maximum allowed number of open transactions has been reached. " + + "See hive.max.open.txns."); + } + } + int numTxns = rqst.getNum_txns(); try { Connection dbConn = null; @@ -2468,6 +2492,36 @@ public void performTimeOuts() { } } + public void countOpenTxns() throws MetaException { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String s = "select count(*) from TXNS where txn_state = '" + TXN_OPEN + "'"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + LOG.warn("Transaction database not properly configured, " + + "can't find txn_state from TXNS."); + } else { + numOpenTxns = rs.getLong(1); + } + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + LOG.info("Failed to update number of open transactions"); + checkRetryable(dbConn, e, "countOpenTxns()"); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + countOpenTxns(); + } + } + private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException { if (connPool != null) return; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 927e9bc..3112459 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -47,7 +47,7 @@ @InterfaceStability.Evolving public interface TxnStore { - public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory} + public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory, OpenTxnsCounter} // Compactor states (Should really be enum) static final public String INITIATED_RESPONSE = "initiated"; static final public String WORKING_RESPONSE = "working"; @@ -77,6 +77,12 @@ public GetOpenTxnsResponse getOpenTxns() throws MetaException; /** + * Get the count for open transactions. + * @throws MetaException + */ + public void countOpenTxns() throws MetaException; + + /** * Open a set of transactions * @param rqst request to open transactions * @return information on opened transactions diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java new file mode 100644 index 0000000..2234f35 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Background running thread, periodically updating number of open transactions. + * Runs inside Hive Metastore Service. + */ +public class AcidOpenTxnsCounterService extends HouseKeeperServiceBase { + private static final Logger LOG = LoggerFactory.getLogger(AcidOpenTxnsCounterService.class); + + @Override + protected long getStartDelayMs() { + return 1000; // in miliseconds + } + @Override + protected long getIntervalMs() { + return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_COUNT_OPEN_TXNS_INTERVAL, TimeUnit.MILLISECONDS); + } + @Override + protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) { + return new OpenTxnsCounter(hiveConf, isAliveCounter); + } + + @Override + public String getServiceDescription() { + return "Count number of open transactions"; + } + + private static final class OpenTxnsCounter implements Runnable { + private final TxnStore txnHandler; + private final AtomicInteger isAliveCounter; + private OpenTxnsCounter(HiveConf hiveConf, AtomicInteger isAliveCounter) { + txnHandler = TxnUtils.getTxnStore(hiveConf); + this.isAliveCounter = isAliveCounter; + } + + @Override + public void run() { + TxnStore.MutexAPI.LockHandle handle = null; + try { + handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.OpenTxnsCounter.name()); + long startTime = System.currentTimeMillis(); + txnHandler.countOpenTxns(); + int count = isAliveCounter.incrementAndGet(); + LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count); + } + catch(Throwable t) { + LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); + } + finally { + if(handle != null) { + handle.releaseLocks(); + } + } + } + } +} diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 04c1d17..8c9c259 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -26,12 +26,7 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HouseKeeperService; -import org.apache.hadoop.hive.metastore.api.CompactionRequest; -import org.apache.hadoop.hive.metastore.api.CompactionType; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; -import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; +import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.metastore.txn.TxnStore; @@ -39,6 +34,7 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService; +import org.apache.hadoop.hive.ql.txn.AcidOpenTxnsCounterService; import org.apache.hadoop.hive.ql.txn.compactor.Cleaner; import org.apache.hadoop.hive.ql.txn.compactor.Initiator; import org.apache.hadoop.hive.ql.txn.compactor.Worker; @@ -655,7 +651,7 @@ private static void runHouseKeeperService(HouseKeeperService houseKeeperService, while(houseKeeperService.getIsAliveCounter() <= lastCount) { if(iterCount++ >= maxIter) { //prevent test hangs - throw new IllegalStateException("HouseKeeper didn't run after " + iterCount + " waits"); + throw new IllegalStateException("HouseKeeper didn't run after " + (iterCount - 1) + " waits"); } try { Thread.sleep(100);//make sure it has run at least once @@ -724,6 +720,42 @@ public void writeBetweenWorkerAndCleaner() throws Exception { Assert.assertEquals("", expected, runStatementOnDriver("select a,b from " + tblName + " order by a")); } + + @Test + public void testOpenTxnsCounter() throws Exception { + hiveConf.setIntVar(HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS, 3); + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_COUNT_OPEN_TXNS_INTERVAL, 10, TimeUnit.MILLISECONDS); + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + OpenTxnsResponse openTxnsResponse = txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost")); + + AcidOpenTxnsCounterService openTxnsCounterService = new AcidOpenTxnsCounterService(); + runHouseKeeperService(openTxnsCounterService, hiveConf); // will update current number of open txns to 3 + + MetaException exception = null; + // This should fail once it finds out the threshold has been reached + try { + txnHandler.openTxns(new OpenTxnRequest(1, "you", "localhost")); + } catch (MetaException e) { + exception = e; + } + Assert.assertNotNull("Opening new transaction shouldn't be allowed", exception); + Assert.assertTrue(exception.getMessage().equals("Maximum allowed number of open transactions has been reached. See hive.max.open.txns.")); + + // After committing the initial txns, and updating current number of open txns back to 0, + // new transactions should be allowed to open + for (long txnid : openTxnsResponse.getTxn_ids()) { + txnHandler.commitTxn(new CommitTxnRequest(txnid)); + } + runHouseKeeperService(openTxnsCounterService, hiveConf); // will update current number of open txns back to 0 + exception = null; + try { + txnHandler.openTxns(new OpenTxnRequest(1, "him", "localhost")); + } catch (MetaException e) { + exception = e; + } + Assert.assertNull(exception); + } + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order