diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 420d35e..83eeac0 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1848,6 +1848,10 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal + "version of Hive ACID subsystem. Mostly it is intended to be used as an internal property\n" + "for future versions of ACID. (See HIVE-14035 for details.)"), + HIVE_ACID_METRICS_COLLECTION_INTERVAL("hive.acid.metrics.collection.interval", "30s", + new TimeValidator(TimeUnit.SECONDS), "Time in seconds between runs to monitor and collect\n" + + "ACID metrics."), + HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" + "current open transactions reach this limit, future open transaction requests will be \n" + "rejected, until this number goes below the limit."), diff --git metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index a8fa1ef..9a4f855 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -7415,6 +7415,7 @@ private static void startHouseKeeperService(HiveConf conf) throws Exception { startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService")); startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService")); startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidWriteSetService")); + startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidMetricsService")); } private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception { //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop() diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 14c834c..10081d7 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -3131,6 +3131,142 @@ public void countOpenTxns() throws MetaException { } } + @Override + @RetrySemantics.ReadOnly + public void monitorAcidAndCollectMetrics() throws MetaException { + String ACID_METRICS = "ACID_METRICS: "; + long TXN_COMPACTION_GAP_THRESHOLD = 60000; + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + // Detect if there are too many open transactions + // This is covered by OpenTxnsCounterService (countOpenTxns) + + // TODO Generate stats for compactions about number of files it reads, min/max/avg size etc + + // TODO Log error is the system is not configured correctly (need to be specific) + + // Monitor the number of aborted transactions + String s = "select count(*) from txns where txn_state=" + quoteChar(TXN_ABORTED); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + long numAbortedTxns = 0; + if (!rs.next()) { + LOG.info("Transaction table txns is corrupted, as it is not queryable."); + } else { + numAbortedTxns = rs.getLong(1); + LOG.info(ACID_METRICS + "Number of aborted transactions is " + numAbortedTxns); + // If there are too many aborted transactions, it's an indication something is wrong + // like incorrectly configured streaming client + } + + // Monitor the number of open transactions + s = "select count(*), min(txn_id), max(txn_id) from txns where txn_state=" + + quoteChar(TXN_OPEN); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + long numOpenTxns = 0; + long lowestOpenTxnId = 0; + long highestOpenTxnId = 0; + if (!rs.next()) { + LOG.info("Transaction table txns is corrupted, as it is not queryable."); + } else if (rs.getLong(1) > 0) { + numOpenTxns = rs.getLong(1); + lowestOpenTxnId = rs.getLong(2); + highestOpenTxnId = rs.getLong(3); + LOG.info(ACID_METRICS + "Number of open transactions is " + numOpenTxns); + LOG.info(ACID_METRICS + "Difference between the earliest transaction and the latest " + + "transaction is " + (highestOpenTxnId - lowestOpenTxnId)); + // If there are too many open transactions, it's an indication the system is busy + } + + // Monitor the number of initiated compactions + s = "select count(*) from compaction_queue where cq_state=" + quoteChar(INITIATED_STATE); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + long numInitedCompactions = 0; + if (!rs.next()) { + LOG.info("Transaction table compaction_queue is corrupted, as it is not queryable."); + } else { + numInitedCompactions = rs.getLong(1); + LOG.info(ACID_METRICS + "Number of initiated compactions is " + numInitedCompactions); + // If there are too many inited compactions, it's an indication compaction is too busy + } + + // Monitor the number of working compactions + s = "select count(*) from compaction_queue where cq_state=" + quoteChar(WORKING_STATE); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + long numWorkingCompactions = 0; + if (!rs.next()) { + LOG.info("Transaction table compaction_queue is corrupted, as it is not queryable."); + } else { + numWorkingCompactions = rs.getLong(1); + LOG.info(ACID_METRICS + "Number of working compactions is " + numWorkingCompactions); + // If there are too many working compactions, it's an indication compaction is too busy + } + + // Detect whether compaction is not likely to be running + s = "select max(txn_started) from txns order by txn_started"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + long latestStartedTxn = 0; + if (!rs.next()) { + LOG.info("Transaction table txns is corrupted, as it is not queryable."); + } else { + latestStartedTxn = rs.getLong(1); + LOG.info(ACID_METRICS + "The most recent transaction was opened at " + latestStartedTxn); + } + + s = "select max(cq_start) from compaction_queue"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + long latestStartedCompaction = 0; + if (!rs.next()) { + LOG.info("Transaction table compaction_queue is corrupted, as it is not queryable."); + } else { + latestStartedCompaction = rs.getLong(1); + LOG.info(ACID_METRICS + "The most recent compaction was started at " + latestStartedCompaction); + } + + long gap = latestStartedTxn - latestStartedCompaction; + if (gap > TXN_COMPACTION_GAP_THRESHOLD) { + LOG.warn(ACID_METRICS + "There is a " + gap + " gap between latest started transaction at " + + latestStartedTxn + " and latest compaction at " + latestStartedCompaction); + } + + // Monitor if there are too many locks being held + s = "select count(*) from hive_locks"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + long numLocks = 0; + if (!rs.next()) { + LOG.info("Transaction table hive_locks is corrupted, as it is not queryable."); + } else { + numLocks = rs.getLong(1); + LOG.info(ACID_METRICS + "Number of locks being acquired or waiting is " + numLocks); + // If this number exceeds certain threshold, it's an indication of deadlock or contention + } + + + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + LOG.info("Failed to collect ACID metrics"); + checkRetryable(dbConn, e, "monitorAcidAndCollectMetrics()"); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + monitorAcidAndCollectMetrics(); + } + } + private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException { if (connPool != null) return; diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 0b0df85..698dea1 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -430,4 +430,11 @@ public void cleanupRecords(HiveObjectType type, Database db, Table table, */ @RetrySemantics.Idempotent public void setHadoopJobId(String hadoopJobId, long id); + + /** + * Collect various kinds of metrics, and put them in the log. + * @throws MetaException + */ + @RetrySemantics.ReadOnly + public void monitorAcidAndCollectMetrics() throws MetaException; } diff --git ql/src/java/org/apache/hadoop/hive/ql/txn/AcidMetricsService.java ql/src/java/org/apache/hadoop/hive/ql/txn/AcidMetricsService.java new file mode 100644 index 0000000..dee4df8 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/txn/AcidMetricsService.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.txn; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Background running thread, periodically updating number of open transactions. + * Runs inside Hive Metastore Service. + */ +public class AcidMetricsService extends HouseKeeperServiceBase { + private static final Logger LOG = LoggerFactory.getLogger(AcidMetricsService.class); + @Override + protected long getStartDelayMs() { + return 100; // in miliseconds + } + @Override + protected long getIntervalMs() { + return hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ACID_METRICS_COLLECTION_INTERVAL, TimeUnit.MILLISECONDS); + } + @Override + protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) { + return new AcidMetricsCollector(hiveConf, isAliveCounter); + } + @Override + public String getServiceDescription() { + return "Count number of open transactions"; + } + private static final class AcidMetricsCollector implements Runnable { + private static volatile long lastLogTime = 0; + private final TxnStore txnHandler; + private final AtomicInteger isAliveCounter; + private AcidMetricsCollector(HiveConf hiveConf, AtomicInteger isAliveCounter) { + txnHandler = TxnUtils.getTxnStore(hiveConf); + this.isAliveCounter = isAliveCounter; + } + @Override + public void run() { + try { + long startTime = System.currentTimeMillis(); + txnHandler.monitorAcidAndCollectMetrics(); + int count = isAliveCounter.incrementAndGet(); + if(System.currentTimeMillis() - lastLogTime > 60*1000) { + //don't flood the logs with too many msgs + LOG.info("AcidMetricsCollector ran for " + (System.currentTimeMillis() - startTime) / 1000 + + "seconds. isAliveCounter=" + count); + lastLogTime = System.currentTimeMillis(); + } + } + catch(Throwable t) { + LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t); + } + } + } +}