diff --git a/metastore/scripts/upgrade/derby/053-HIVE-18747.derby.sql b/metastore/scripts/upgrade/derby/053-HIVE-18747.derby.sql new file mode 100644 index 0000000..6b8022b --- /dev/null +++ b/metastore/scripts/upgrade/derby/053-HIVE-18747.derby.sql @@ -0,0 +1,7 @@ +CREATE TABLE MIN_HISTORY_LEVEL ( + MHL_TXNID bigint NOT NULL, + MHL_MIN_OPEN_TXNID bigint NOT NULL, + PRIMARY KEY(MHL_TXNID) +); + +CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); diff --git a/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql b/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql index 99838b4..22b6561 100644 --- a/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql +++ b/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql @@ -73,6 +73,14 @@ CREATE TABLE NEXT_WRITE_ID ( CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); +CREATE TABLE MIN_HISTORY_LEVEL ( + MHL_TXNID bigint NOT NULL, + MHL_MIN_OPEN_TXNID bigint NOT NULL, + PRIMARY KEY(MHL_TXNID) +); + +CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); + CREATE TABLE HIVE_LOCKS ( HL_LOCK_EXT_ID bigint NOT NULL, HL_LOCK_INT_ID bigint NOT NULL, diff --git a/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql b/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql index 1e4dd99..cdac464 100644 --- a/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql +++ b/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql @@ -10,5 +10,6 @@ RUN '049-HIVE-18489.derby.sql'; RUN '050-HIVE-18192.derby.sql'; RUN '051-HIVE-18675.derby.sql'; RUN '052-HIVE-18965.derby.sql'; +RUN '053-HIVE-18747.derby.sql'; UPDATE "APP".VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index df9a5a0..4d51bbc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -263,15 +263,15 @@ private void clean(CompactionInfo ci) throws MetaException { * Between that check and removeFiles() a query starts (it will be reading D3) and another compaction * completes which creates D4. * Now removeFiles() (more specifically AcidUtils.getAcidState()) will declare D3 to be obsolete - * unless ValidTxnList is "capped" at highestWriteId. + * unless ValidWriteIdList is "capped" at highestWriteId. */ - final ValidWriteIdList txnList = (ci.highestWriteId > 0) + final ValidWriteIdList validWriteIdList = (ci.highestWriteId > 0) ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId) : new ValidReaderWriteIdList(); if (runJobAsSelf(ci.runAs)) { - removeFiles(location, txnList); + removeFiles(location, validWriteIdList); } else { LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName()); UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs, @@ -279,7 +279,7 @@ private void clean(CompactionInfo ci) throws MetaException { ugi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { - removeFiles(location, txnList); + removeFiles(location, validWriteIdList); return null; } }); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index a04ac3b..22765b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -173,6 +173,9 @@ public void run() { // Clean anything from the txns table that has no components left in txn_components. txnHandler.cleanEmptyAbortedTxns(); + + // Clean TXN_TO_WRITE_ID table for entries under min_uncommitted_txn referred by any open txns. + txnHandler.cleanTxnToWriteIdTable(); } catch (Throwable t) { LOG.error("Initiator loop caught unexpected exception this time through the loop: " + StringUtils.stringifyException(t)); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index b832f71..a980559 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -52,6 +52,8 @@ import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService; @@ -2027,6 +2029,112 @@ public void testMmTableCompaction() throws Exception { verifyDirAndResult(2); } + /** + * Test cleaner for TXN_TO_WRITE_ID table + * @throws Exception + */ + @Test + public void testCleanerForTxnToWriteId() throws Exception { + int[][] tableData1 = {{1,2}}; + int[][] tableData2 = {{2,3}}; + int[][] tableData3 = {{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData1)); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2)); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData3)); + runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData1)); + runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=2) (a,b) " + makeValuesClause(tableData2)); + + // All inserts are committed and hence would expect in TXN_TO_WRITE_ID, 3 entries for acidTbl + // and 2 entries for acidTblPart as each insert would have allocated a writeid. + // Also MIN_HISTORY_LEVEL won't have any entries as no reference for open txns. + String acidTblWhereClause = " where t2w_database = " + quoteString("default") + + " and t2w_table = " + quoteString(Table.ACIDTBL.name().toLowerCase()); + String acidTblPartWhereClause = " where t2w_database = " + quoteString("default") + + " and t2w_table = " + quoteString(Table.ACIDTBLPART.name().toLowerCase()); + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"), + 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL")); + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause), + 3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause)); + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause), + 2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblPartWhereClause)); + + TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); + txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR)); + runWorker(hiveConf); + runCleaner(hiveConf); + txnHandler.cleanTxnToWriteIdTable(); + + // After compaction/cleanup, all entries from TXN_TO_WRITE_ID should be cleaned up as all txns are committed. + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"), + 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID")); + + // Following sequence of commit-abort-open-abort-commit. + int[][] tableData4 = {{4,5}}; + int[][] tableData5 = {{5,6}}; + runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=3) (a,b) " + makeValuesClause(tableData3)); + + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData4)); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + + // Keep an open txn which refers to the aborted txn. + Context ctx = new Context(hiveConf); + HiveTxnManager txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(hiveConf); + txnMgr.openTxn(ctx, "u1"); + txnMgr.getValidTxns(); + + // Start an INSERT statement transaction and roll back this transaction. + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData5)); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData5)); + + // We would expect 4 entries in TXN_TO_WRITE_ID as each insert would have allocated a writeid + // including aborted one. + // Also MIN_HISTORY_LEVEL will have 1 entry for the open txn. + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause), + 3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause)); + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause), + 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblPartWhereClause)); + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"), + 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL")); + + // The entry relevant to aborted txns shouldn't be removed from TXN_TO_WRITE_ID as + // aborted txn would be removed from TXNS only after the compaction. Also, committed txn > open txn is retained. + // As open txn doesn't allocate writeid, the 2 entries for aborted and committed should be retained. + txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanTxnToWriteIdTable(); + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause), + 3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause)); + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause), + 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblPartWhereClause)); + + // The cleaner will removed aborted txns data/metadata but cannot remove aborted txn2 from TXN_TO_WRITE_ID + // as there is a open txn < aborted txn2. The aborted txn1 < open txn and will be removed. + // Also, committed txn > open txn is retained. + // MIN_HISTORY_LEVEL will have 1 entry for the open txn. + txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR)); + runWorker(hiveConf); + runCleaner(hiveConf); + txnHandler.cleanEmptyAbortedTxns(); + txnHandler.cleanTxnToWriteIdTable(); + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"), + 2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID")); + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"), + 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL")); + + // Commit the open txn, which lets the cleanup on TXN_TO_WRITE_ID. + // Now all txns are removed from MIN_HISTORY_LEVEL. So, all entries from TXN_TO_WRITE_ID would be cleaned. + txnMgr.commitTxn(); + txnHandler.cleanTxnToWriteIdTable(); + + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"), + 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID")); + Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"), + 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL")); + } + private void verifyDirAndResult(int expectedDeltas) throws Exception { FileSystem fs = FileSystem.get(hiveConf); // Verify the content of subdirs @@ -2127,4 +2235,8 @@ final void assertUniqueID(Table table) throws Exception { List r = runStatementOnDriver(sb.toString()); Assert.assertTrue("Duplicate ROW__ID: " + r.toString(),r.size() == 0); } + + static String quoteString(String input) { + return "'" + input + "'"; + } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index ba006cf..4e3068d 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -481,7 +481,85 @@ public void markCleaned(CompactionInfo info) throws MetaException { } /** - * Clean up aborted transactions from txns that have no components in txn_components. The reason such + * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by + * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)). + */ + @Override + @RetrySemantics.SafeToRetry + public void cleanTxnToWriteIdTable() throws MetaException { + try { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + + try { + // We query for minimum values in all the queries and they can only increase by any concurrent + // operations. So, READ COMMITTED is sufficient. + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + // First need to find the min_uncommitted_txnid which is currently seen by any open transactions. + // If there are no txns which are currently open or aborted in the system, then current value of + // NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid. + String s = "select ntxn_next from NEXT_TXN_ID"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_txn_id"); + } + long minUncommittedTxnId = rs.getLong(1); + + // If there are any open txns, then the minimum of min_open_txnid from MIN_HISTORY_LEVEL table + // could be the min_uncommitted_txnid if lesser than NEXT_TXN_ID.ntxn_next. + s = "select min(mhl_min_open_txnid) from MIN_HISTORY_LEVEL"; + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (rs.next()) { + long minOpenTxnId = rs.getLong(1); + if (minOpenTxnId > 0) { + minUncommittedTxnId = Math.min(minOpenTxnId, minUncommittedTxnId); + } + } + + // If there are aborted txns, then the minimum aborted txnid could be the min_uncommitted_txnid + // if lesser than both NEXT_TXN_ID.ntxn_next and min(MIN_HISTORY_LEVEL .mhl_min_open_txnid). + s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_ABORTED); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (rs.next()) { + long minAbortedTxnId = rs.getLong(1); + if (minAbortedTxnId > 0) { + minUncommittedTxnId = Math.min(minAbortedTxnId, minUncommittedTxnId); + } + } + + // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed + // to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table. + s = "delete from TXN_TO_WRITE_ID where t2w_txnid < " + minUncommittedTxnId; + LOG.debug("Going to execute delete <" + s + ">"); + int rc = stmt.executeUpdate(s); + LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommittedTxnId); + + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.error("Unable to delete from txns table " + e.getMessage()); + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "cleanTxnToWriteIdTable"); + throw new MetaException("Unable to connect to transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + cleanTxnToWriteIdTable(); + } + } + + /** + * Clean up aborted transactions from txns that have no components in txn_components. The reason such * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. */ @@ -524,7 +602,6 @@ public void cleanEmptyAbortedTxns() throws MetaException { LOG.info("Removed " + rc + " empty Aborted transactions from TXNS"); } LOG.info("Aborted transactions removed from TXNS: " + txnids); - LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 588f335..309a937 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -109,6 +109,11 @@ public static void prepDb(Configuration conf) throws Exception { " NWI_TABLE varchar(256) NOT NULL," + " NWI_NEXT bigint NOT NULL)"); + stmt.execute("CREATE TABLE MIN_HISTORY_LEVEL (" + + " MHL_TXNID bigint NOT NULL," + + " MHL_MIN_OPEN_TXNID bigint NOT NULL," + + " PRIMARY KEY(MHL_TXNID))"); + stmt.execute("CREATE TABLE HIVE_LOCKS (" + " HL_LOCK_EXT_ID bigint NOT NULL," + " HL_LOCK_INT_ID bigint NOT NULL," + @@ -234,6 +239,7 @@ public static void cleanDb(Configuration conf) throws Exception { success &= dropTable(stmt, "NEXT_TXN_ID", retryCount); success &= dropTable(stmt, "TXN_TO_WRITE_ID", retryCount); success &= dropTable(stmt, "NEXT_WRITE_ID", retryCount); + success &= dropTable(stmt, "MIN_HISTORY_LEVEL", retryCount); success &= dropTable(stmt, "HIVE_LOCKS", retryCount); success &= dropTable(stmt, "NEXT_LOCK_ID", retryCount); success &= dropTable(stmt, "COMPACTION_QUEUE", retryCount); diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index e453e5a..2de29d6 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -529,6 +529,10 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { } int numTxns = rqst.getNum_txns(); + if (numTxns <= 0) { + throw new MetaException("Invalid input for number of txns: " + numTxns); + } + try { Connection dbConn = null; Statement stmt = null; @@ -584,6 +588,34 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { LOG.debug("Going to execute update <" + q + ">"); stmt.execute(q); } + + // Need to register minimum open txnid for current transactions into MIN_HISTORY table. + s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_OPEN); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new IllegalStateException("Scalar query returned no rows?!?!!"); + } + + // TXNS table should have atleast one entry because we just inserted the newly opened txns. + // So, min(txn_id) would be a non-zero txnid. + long minOpenTxnId = rs.getLong(1); + assert(minOpenTxnId > 0); + rows.clear(); + for (long txnId = first; txnId < first + numTxns; txnId++) { + rows.add(txnId + ", " + minOpenTxnId); + } + + // Insert transaction entries into MIN_HISTORY_LEVEL. + List inserts = sqlGenerator.createInsertValuesStmt( + "MIN_HISTORY_LEVEL (mhl_txnid, mhl_min_open_txnid)", rows); + for (String insert : inserts) { + LOG.debug("Going to execute insert <" + insert + ">"); + stmt.execute(insert); + } + LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds + + ") with min_open_txn: " + minOpenTxnId); + LOG.debug("Going to commit"); dbConn.commit(); return new OpenTxnsResponse(txnIds); @@ -845,8 +877,10 @@ public void commitTxn(CommitTxnRequest rqst) s = "delete from TXNS where txn_id = " + txnid; LOG.debug("Going to execute update <" + s + ">"); modCount = stmt.executeUpdate(s); - LOG.debug("Going to commit"); - dbConn.commit(); + s = "delete from MIN_HISTORY_LEVEL where mhl_txnid = " + txnid; + LOG.debug("Going to execute update <" + s + ">"); + modCount = stmt.executeUpdate(s); + LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); // Update registry with modifications s = "select ctc_database, ctc_table, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid; @@ -858,6 +892,8 @@ public void commitTxn(CommitTxnRequest rqst) rs.getTimestamp(3, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()); } close(rs); + + LOG.debug("Going to commit"); dbConn.commit(); } catch (SQLException e) { LOG.debug("Going to rollback"); @@ -943,6 +979,7 @@ private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullT // Find the writeId high water mark based upon txnId high water mark. If found, then, need to // traverse through all write Ids less than writeId HWM to make exceptions list. + // The writeHWM = min(NEXT_WRITE_ID.nwi_next-1, max(TXN_TO_WRITE_ID.t2w_writeid under txnHwm)) String s = "select max(t2w_writeid) from TXN_TO_WRITE_ID where t2w_txnid <= " + txnHwm + " and t2w_database = " + quoteString(names[0]) + " and t2w_table = " + quoteString(names[1]); @@ -950,36 +987,51 @@ private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullT rs = stmt.executeQuery(s); if (rs.next()) { writeIdHwm = rs.getLong(1); + } - // As writeIdHwm is known, query all writeIds under the writeId HWM. - // If any writeId under HWM is allocated by txn > txnId HWM, then will be added to invalid list. - // The output of this query includes all the txns which are under the high water mark. It includes - // the committed transactions as well. The results should be sorted in ascending order based - // on write id. The sorting is needed as exceptions list in ValidWriteIdList would be looked-up - // using binary search. - s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_writeid <= " + writeIdHwm - + " and t2w_database = " + quoteString(names[0]) - + " and t2w_table = " + quoteString(names[1]) - + " order by t2w_writeid asc"; - + // If no writeIds allocated by txns under txnHwm, then find writeHwm from NEXT_WRITE_ID. + if (writeIdHwm <= 0) { + // Need to subtract 1 as nwi_next would be the next write id to be allocated but we need highest + // allocated write id. + s = "select nwi_next-1 from NEXT_WRITE_ID where nwi_database = " + quoteString(names[0]) + + " and nwi_table = " + quoteString(names[1]); LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); - while (rs.next()) { - long txnId = rs.getLong(1); - long writeId = rs.getLong(2); - if (validTxnList.isTxnValid(txnId)) { - // Skip if the transaction under evaluation is already committed. - continue; + if (rs.next()) { + long maxWriteId = rs.getLong(1); + if (maxWriteId > 0) { + writeIdHwm = Math.min(maxWriteId, writeIdHwm); } + } + } - // The current txn is either in open or aborted state. - // Mark the write ids state as per the txn state. - invalidWriteIdList.add(writeId); - if (validTxnList.isTxnAborted(txnId)) { - abortedBits.set(invalidWriteIdList.size() - 1); - } else { - minOpenWriteId = Math.min(minOpenWriteId, writeId); - } + // As writeIdHwm is known, query all writeIds under the writeId HWM. + // If any writeId under HWM is allocated by txn > txnId HWM or belongs to open/aborted txns, + // then will be added to invalid list. The results should be sorted in ascending order based + // on write id. The sorting is needed as exceptions list in ValidWriteIdList would be looked-up + // using binary search. + s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_writeid <= " + writeIdHwm + + " and t2w_database = " + quoteString(names[0]) + + " and t2w_table = " + quoteString(names[1]) + + " order by t2w_writeid asc"; + + LOG.debug("Going to execute query<" + s + ">"); + rs = stmt.executeQuery(s); + while (rs.next()) { + long txnId = rs.getLong(1); + long writeId = rs.getLong(2); + if (validTxnList.isTxnValid(txnId)) { + // Skip if the transaction under evaluation is already committed. + continue; + } + + // The current txn is either in open or aborted state. + // Mark the write ids state as per the txn state. + invalidWriteIdList.add(writeId); + if (validTxnList.isTxnAborted(txnId)) { + abortedBits.set(invalidWriteIdList.size() - 1); + } else { + minOpenWriteId = Math.min(minOpenWriteId, writeId); } } @@ -2827,6 +2879,26 @@ private int abortTxns(Connection dbConn, List txnids, long max_heartbeat, updateCnt += stmt.executeUpdate(query); } + // As current txn is aborted, this won't read any data from other txns. So, it is safe to unregister + // the min_open_txnid from MIN_HISTORY_LEVEL for the aborted txns. Even if the txns in the list are + // partially aborted, it is safe to delete from MIN_HISTORY_LEVEL as the remaining txns are either + // already committed or aborted. + queries.clear(); + prefix.setLength(0); + suffix.setLength(0); + + prefix.append("delete from MIN_HISTORY_LEVEL where "); + suffix.append(""); + + TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "mhl_txnid", false, false); + + for (String query : queries) { + LOG.debug("Going to execute update <" + query + ">"); + int rc = stmt.executeUpdate(query); + LOG.debug("Deleted " + rc + " records from MIN_HISTORY_LEVEL"); + } + LOG.info("Removed aborted transactions: (" + txnids + ") from MIN_HISTORY_LEVEL"); + if (updateCnt < txnids.size() && isStrict) { /** * have to bail in this case since we don't know which transactions were not Aborted and diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 38fa0e2..e72d327 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -330,6 +330,13 @@ void cleanupRecords(HiveObjectType type, Database db, Table table, void markFailed(CompactionInfo info) throws MetaException; /** + * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by + * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)). + */ + @RetrySemantics.SafeToRetry + void cleanTxnToWriteIdTable() throws MetaException; + + /** * Clean up aborted transactions from txns that have no components in txn_components. The reson such * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called. diff --git a/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql index de9688d..907e6e8 100644 --- a/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql +++ b/standalone-metastore/src/main/sql/derby/hive-schema-3.0.0.derby.sql @@ -545,6 +545,14 @@ CREATE TABLE NEXT_WRITE_ID ( CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); +CREATE TABLE MIN_HISTORY_LEVEL ( + MHL_TXNID bigint NOT NULL, + MHL_MIN_OPEN_TXNID bigint NOT NULL, + PRIMARY KEY(MHL_TXNID) +); + +CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); + CREATE TABLE "APP"."I_SCHEMA" ( "SCHEMA_ID" bigint primary key, "SCHEMA_TYPE" integer not null, diff --git a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql index 9f187f9..5215cba 100644 --- a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql +++ b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql @@ -161,3 +161,12 @@ ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint; ALTER TABLE "APP"."KEY_CONSTRAINTS" ADD COLUMN "DEFAULT_VALUE" VARCHAR(400); ALTER TABLE "APP"."HIVE_LOCKS" ALTER COLUMN "HL_TXNID" NOT NULL; + +-- HIVE-18747 +CREATE TABLE MIN_HISTORY_LEVEL ( + MHL_TXNID bigint NOT NULL, + MHL_MIN_OPEN_TXNID bigint NOT NULL, + PRIMARY KEY(MHL_TXNID) +); + +CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); diff --git a/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql index 68237ec..ba3ed08 100644 --- a/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql +++ b/standalone-metastore/src/main/sql/mssql/hive-schema-3.0.0.mssql.sql @@ -1154,6 +1154,17 @@ CREATE TABLE NEXT_WRITE_ID ( CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); +CREATE TABLE MIN_HISTORY_LEVEL ( + MHL_TXNID bigint NOT NULL, + MHL_MIN_OPEN_TXNID bigint NOT NULL, +PRIMARY KEY CLUSTERED +( + MHL_TXNID ASC +) +); + +CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); + CREATE TABLE "I_SCHEMA" ( "SCHEMA_ID" bigint primary key, "SCHEMA_TYPE" int not null, diff --git a/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql index 0b5f8a4..d79ad49 100644 --- a/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql +++ b/standalone-metastore/src/main/sql/mssql/upgrade-2.3.0-to-3.0.0.mssql.sql @@ -214,3 +214,15 @@ ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint; ALTER TABLE KEY_CONSTRAINTS ADD DEFAULT_VALUE VARCHAR(400); ALTER TABLE HIVE_LOCKS MODIFY ALTER COLUMN HL_TXNID bigint NOT NULL; + +-- HIVE-18747 +CREATE TABLE MIN_HISTORY_LEVEL ( + MHL_TXNID bigint NOT NULL, + MHL_MIN_OPEN_TXNID bigint NOT NULL, +PRIMARY KEY CLUSTERED +( + MHL_TXNID ASC +) +); + +CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); diff --git a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql index 3e2db2a..fa76282 100644 --- a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql +++ b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql @@ -1089,6 +1089,14 @@ CREATE TABLE NEXT_WRITE_ID ( CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); +CREATE TABLE MIN_HISTORY_LEVEL ( + MHL_TXNID bigint NOT NULL, + MHL_MIN_OPEN_TXNID bigint NOT NULL, + PRIMARY KEY(MHL_TXNID) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); + CREATE TABLE `I_SCHEMA` ( `SCHEMA_ID` BIGINT PRIMARY KEY, `SCHEMA_TYPE` INTEGER NOT NULL, diff --git a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql index d7c49e4..ae83b0d 100644 --- a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql +++ b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql @@ -178,7 +178,7 @@ CREATE TABLE TXN_TO_WRITE_ID ( T2W_DATABASE varchar(128) NOT NULL, T2W_TABLE varchar(256) NOT NULL, T2W_WRITEID bigint NOT NULL -); +) ENGINE=InnoDB DEFAULT CHARSET=latin1; CREATE UNIQUE INDEX TBL_TO_TXN_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID); CREATE UNIQUE INDEX TBL_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_WRITEID); @@ -187,7 +187,7 @@ CREATE TABLE NEXT_WRITE_ID ( NWI_DATABASE varchar(128) NOT NULL, NWI_TABLE varchar(256) NOT NULL, NWI_NEXT bigint NOT NULL -); +) ENGINE=InnoDB DEFAULT CHARSET=latin1; CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); @@ -204,3 +204,12 @@ ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint; ALTER TABLE `KEY_CONSTRAINTS` ADD COLUMN `DEFAULT_VALUE` VARCHAR(400); ALTER TABLE `HIVE_LOCKS` MODIFY COLUMN `HL_TXNID` NOT NULL; + +-- HIVE-18747 +CREATE TABLE MIN_HISTORY_LEVEL ( + MHL_TXNID bigint NOT NULL, + MHL_MIN_OPEN_TXNID bigint NOT NULL, + PRIMARY KEY(MHL_TXNID) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); diff --git a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql index 09c40ad..e9e7f81 100644 --- a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql +++ b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql @@ -1062,6 +1062,14 @@ CREATE TABLE NEXT_WRITE_ID ( CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); +CREATE TABLE MIN_HISTORY_LEVEL ( + MHL_TXNID number(19) NOT NULL, + MHL_MIN_OPEN_TXNID number(19) NOT NULL, + PRIMARY KEY(MHL_TXNID) +); + +CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); + CREATE TABLE "I_SCHEMA" ( "SCHEMA_ID" number primary key, "SCHEMA_TYPE" number not null, diff --git a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql index 51eff3e..8f99822 100644 --- a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql +++ b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql @@ -224,3 +224,12 @@ ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID number(19); ALTER TABLE KEY_CONSTRAINTS ADD DEFAULT_VALUE VARCHAR(400); ALTER TABLE HIVE_LOCKS MODIFY(HL_TXNID NOT NULL); + +-- HIVE-18747 +CREATE TABLE MIN_HISTORY_LEVEL ( + MHL_TXNID number(19) NOT NULL, + MHL_MIN_OPEN_TXNID number(19) NOT NULL, + PRIMARY KEY(MHL_TXNID) +); + +CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); diff --git a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql index 69317b0..4f67d91 100644 --- a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql +++ b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql @@ -1754,6 +1754,14 @@ CREATE TABLE NEXT_WRITE_ID ( CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); +CREATE TABLE MIN_HISTORY_LEVEL ( + MHL_TXNID bigint NOT NULL, + MHL_MIN_OPEN_TXNID bigint NOT NULL, + PRIMARY KEY(MHL_TXNID) +); + +CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); + CREATE TABLE "I_SCHEMA" ( "SCHEMA_ID" bigint primary key, "SCHEMA_TYPE" integer not null, diff --git a/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql index 2766568..50be127 100644 --- a/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql +++ b/standalone-metastore/src/main/sql/postgres/upgrade-2.3.0-to-3.0.0.postgres.sql @@ -239,3 +239,12 @@ ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint; ALTER TABLE "KEY_CONSTRAINTS" ADD COLUMN "DEFAULT_VALUE" VARCHAR(400); ALTER TABLE HIVE_LOCKS ALTER COLUMN HL_TXNID SET NOT NULL; + +-- HIVE_18747 +CREATE TABLE MIN_HISTORY_LEVEL ( + MHL_TXNID bigint NOT NULL, + MHL_MIN_OPEN_TXNID bigint NOT NULL, + PRIMARY KEY(MHL_TXNID) +); + +CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID);