diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index 610cf05204..6895ad513e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -70,8 +70,8 @@ static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold."; Map tblNameOwnersCache = new HashMap<>(); - private long checkInterval; + private long prevStart = -1; @Override public void run() { @@ -95,9 +95,13 @@ public void run() { try { handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name()); startedAt = System.currentTimeMillis(); + + long checkInterval = (prevStart < 0) ? prevStart : (startedAt - prevStart)/1000; + prevStart = startedAt; + //todo: add method to only get current i.e. skip history - more efficient ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest()); - Set potentials = txnHandler.findPotentialCompactions(abortedThreshold) + Set potentials = txnHandler.findPotentialCompactions(abortedThreshold, checkInterval) .stream().filter(ci -> checkCompactionElig(ci)).collect(Collectors.toSet()); LOG.debug("Found " + potentials.size() + " potential compactions, " + "checking to see if we should compact any of them"); diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index b28b57779b..efd9c518b0 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -379,6 +379,13 @@ public void testFindPotentialCompactions() throws Exception { } assertTrue(sawMyTable); assertTrue(sawYourTable); + Thread.sleep(1000); + + potentials = txnHandler.findPotentialCompactions(100, 1); + assertEquals(0, potentials.size()); + + potentials = txnHandler.findPotentialCompactions(100, 3); + assertEquals(2, potentials.size()); } // TODO test changes to mark cleaned to clean txns and txn_components diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 8253ccb9c9..b60359296e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -53,14 +53,19 @@ public CompactionTxnHandler() { * This will look through the completed_txn_components table and look for partitions or tables * that may be ready for compaction. Also, look through txns and txn_components tables for * aborted transactions that we should add to the list. - * @param maxAborted Maximum number of aborted queries to allow before marking this as a - * potential compaction. + * @param abortedThreshold number of aborted queries forming a potential compaction request. * @return list of CompactionInfo structs. These will not have id, type, * or runAs set since these are only potential compactions not actual ones. */ @Override @RetrySemantics.ReadOnly - public Set findPotentialCompactions(int maxAborted) throws MetaException { + public Set findPotentialCompactions(int abortedThreshold) throws MetaException { + return findPotentialCompactions(abortedThreshold, -1); + } + + @Override + @RetrySemantics.ReadOnly + public Set findPotentialCompactions(int abortedThreshold, long checkInterval) throws MetaException { Connection dbConn = null; Set response = new HashSet<>(); Statement stmt = null; @@ -70,8 +75,9 @@ public CompactionTxnHandler() { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); // Check for completed transactions - String s = "select distinct ctc_database, ctc_table, " + - "ctc_partition from COMPLETED_TXN_COMPONENTS"; + String s = "select distinct ctc_database, ctc_table, ctc_partition " + + "from COMPLETED_TXN_COMPONENTS " + (checkInterval > 0 ? + "where " + isWithinCheckInterval("ctc_timestamp", checkInterval) : ""); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); while (rs.next()) { @@ -88,7 +94,7 @@ public CompactionTxnHandler() { "from TXNS, TXN_COMPONENTS " + "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " + "group by tc_database, tc_table, tc_partition " + - "having count(*) > " + maxAborted; + "having count(*) > " + abortedThreshold; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); @@ -105,14 +111,14 @@ public CompactionTxnHandler() { dbConn.rollback(); } catch (SQLException e) { LOG.error("Unable to connect to transaction database " + e.getMessage()); - checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + maxAborted + ")"); + checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + abortedThreshold + ")"); } finally { close(rs, stmt, dbConn); } return response; } catch (RetryException e) { - return findPotentialCompactions(maxAborted); + return findPotentialCompactions(abortedThreshold, checkInterval); } } /** diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 6281208247..23bd2463a8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -3913,6 +3913,30 @@ protected long getDbTime(Connection conn) throws MetaException { } } + protected String isWithinCheckInterval(String expr, long interval) throws MetaException { + String condition; + switch (dbProduct) { + case DERBY: + condition = " {fn TIMESTAMPDIFF(sql_tsi_second, " + expr + ", current_timestamp)} < " + interval; + break; + case MYSQL: + case POSTGRES: + condition = expr + " > current_timestamp - interval '" + interval + "' second"; + break; + case SQLSERVER: + condition = "DATEDIFF(second, " + expr + ", current_timestamp) < " + interval; + break; + case ORACLE: + condition = expr + " > current_timestamp - numtodsinterval(" + interval + " , 'second')"; + break; + default: + String msg = "Unknown database product: " + dbProduct.toString(); + LOG.error(msg); + throw new MetaException(msg); + } + return condition; + } + /** * Determine the String that should be used to quote identifiers. * @param conn Active connection diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index e840758c9d..41d2e7924b 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -314,13 +314,15 @@ void onRename(String oldCatName, String oldDbName, String oldTabName, String old * This will look through the completed_txn_components table and look for partitions or tables * that may be ready for compaction. Also, look through txns and txn_components tables for * aborted transactions that we should add to the list. - * @param maxAborted Maximum number of aborted queries to allow before marking this as a - * potential compaction. + * @param abortedThreshold number of aborted queries forming a potential compaction request. * @return list of CompactionInfo structs. These will not have id, type, * or runAs set since these are only potential compactions not actual ones. */ @RetrySemantics.ReadOnly - Set findPotentialCompactions(int maxAborted) throws MetaException; + Set findPotentialCompactions(int abortedThreshold) throws MetaException; + + @RetrySemantics.ReadOnly + Set findPotentialCompactions(int abortedThreshold, long checkInterval) throws MetaException; /** * This updates COMPACTION_QUEUE. Set runAs username for the case where the request was