diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 5e4c7be..56aed28 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -437,23 +437,21 @@ public void cleanEmptyAbortedTxns() throws MetaException { "txn_state = '" + TXN_ABORTED + "'"; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); - Set txnids = new HashSet(); + List txnids = new ArrayList<>(); while (rs.next()) txnids.add(rs.getLong(1)); - if (txnids.size() > 0) { - StringBuilder buf = new StringBuilder("delete from TXNS where txn_id in ("); - boolean first = true; - for (long tid : txnids) { - if (first) first = false; - else buf.append(", "); - buf.append(tid); - } - buf.append(")"); - String bufStr = buf.toString(); - LOG.debug("Going to execute update <" + bufStr + ">"); - int rc = stmt.executeUpdate(bufStr); - LOG.info("Removed " + rc + " empty Aborted transactions: " + txnids + " from TXNS"); - LOG.debug("Going to commit"); - dbConn.commit(); + close(rs); + if(txnids.size() <= 0) { + return; + } + for(int i = 0; i < txnids.size() / TIMED_OUT_TXN_ABORT_BATCH_SIZE; i++) { + List txnIdBatch = txnids.subList(i * TIMED_OUT_TXN_ABORT_BATCH_SIZE, + (i + 1) * TIMED_OUT_TXN_ABORT_BATCH_SIZE); + deleteTxns(dbConn, stmt, txnIdBatch); + } + int partialBatchSize = txnids.size() % TIMED_OUT_TXN_ABORT_BATCH_SIZE; + if(partialBatchSize > 0) { + List txnIdBatch = txnids.subList(txnids.size() - partialBatchSize, txnids.size()); + deleteTxns(dbConn, stmt, txnIdBatch); } } catch (SQLException e) { LOG.error("Unable to delete from txns table " + e.getMessage()); @@ -469,6 +467,18 @@ public void cleanEmptyAbortedTxns() throws MetaException { cleanEmptyAbortedTxns(); } } + private static void deleteTxns(Connection dbConn, Statement stmt, List txnIdBatch) throws SQLException { + StringBuilder buf = new StringBuilder("delete from TXNS where txn_id in ("); + for(long txnid : txnIdBatch) { + buf.append(txnid).append(','); + } + buf.setCharAt(buf.length() - 1, ')'); + LOG.debug("Going to execute update <" + buf + ">"); + int rc = stmt.executeUpdate(buf.toString()); + LOG.info("Removed " + rc + " empty Aborted transactions: " + txnIdBatch + " from TXNS"); + LOG.debug("Going to commit"); + dbConn.commit(); + } /** * This will take all entries assigned to workers diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 7f8cb71..cb5bbfb 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -22,6 +22,8 @@ import org.apache.commons.dbcp.ConnectionFactory; import org.apache.commons.dbcp.DriverManagerConnectionFactory; import org.apache.commons.dbcp.PoolableConnectionFactory; +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.tools.ant.taskdefs.Java; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +61,8 @@ * Currently the DB schema has this NOT NULL) and only update/read heartbeat from corresponding * transaction in TXNS. */ +@InterfaceAudience.Private +@InterfaceStability.Evolving public class TxnHandler { // Compactor states static final public String INITIATED_RESPONSE = "initiated"; @@ -87,7 +91,7 @@ static final protected char LOCK_SEMI_SHARED = 'w'; static final private int ALLOWED_REPEATED_DEADLOCKS = 10; - static final private int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 100; + public static final int TIMED_OUT_TXN_ABORT_BATCH_SIZE = 1000; static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); static private DataSource connPool; @@ -2046,7 +2050,7 @@ public void performTimeOuts() { stmt = dbConn.createStatement(); String s = " txn_id from TXNS where txn_state = '" + TXN_OPEN + "' and txn_last_heartbeat < " + (now - timeout); - s = addLimitClause(dbConn, 2500, s); + s = addLimitClause(dbConn, 250 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s); LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); if(!rs.next()) { diff --git ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index e9b4154..03a6494 100644 --- ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.txn.compactor; +import org.apache.hadoop.hive.metastore.txn.TxnHandler; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -204,12 +205,12 @@ public void cleanEmptyAbortedTxns() throws Exception { LockResponse res = txnHandler.lock(req); txnHandler.abortTxn(new AbortTxnRequest(txnid)); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50; i++) { txnid = openTxn(); txnHandler.abortTxn(new AbortTxnRequest(txnid)); } GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns(); - Assert.assertEquals(101, openTxns.getOpen_txnsSize()); + Assert.assertEquals(TxnHandler.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize()); startInitiator();