diff --git ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java index 868da0c7a0..c50ff98651 100644 --- ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java +++ ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java @@ -77,6 +77,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -1754,6 +1755,58 @@ public void testReplAllocWriteId() throws Exception { assertFalse(failed); } + @Test + public void allocateNextWriteIdRetriesAfterConflictingConcurrentInsert() throws Exception { + String dbName = "abc"; + String tableName = "def"; + int numTxns = 2; + try (Connection dbConn = ((TxnHandler) txnHandler).getDbConn(Connection.TRANSACTION_READ_COMMITTED); + Statement stmt = dbConn.createStatement()) { + // run this multiple times to get write-write conflicts between the two threads with higher chance + for (int i = 1; i <= 20; ++i) { + // make sure these 2 tables have no records of our dbName.tableName + stmt.executeUpdate("TRUNCATE TABLE \"NEXT_WRITE_ID\""); + stmt.executeUpdate("TRUNCATE TABLE \"TXN_TO_WRITE_ID\""); + dbConn.commit(); + + OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(numTxns, "me", "localhost")); + AllocateTableWriteIdsRequest request = new AllocateTableWriteIdsRequest(dbName, tableName); + resp.getTxn_ids().forEach(request::addToTxnIds); + + // thread 1: allocate write ID via txnHandler + CompletableFuture future1 = CompletableFuture.supplyAsync(() -> { + try { + return txnHandler.allocateTableWriteIds(request); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + }); + + // thread 2: insert next_write_id record directly into DB + CompletableFuture future2 = CompletableFuture.runAsync(() -> { + try { + Thread.sleep(10); + stmt.executeUpdate(String.format("INSERT INTO \"NEXT_WRITE_ID\" VALUES ('%s', '%s', 1)", dbName, tableName)); + dbConn.commit(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + CompletableFuture.allOf(future1, future2).join(); + + // validate that all write ID allocation attempts have succeeded + AllocateTableWriteIdsResponse result = future1.get(); + assertEquals(2, result.getTxnToWriteIds().size()); + assertEquals(i * numTxns - 1, result.getTxnToWriteIds().get(0).getTxnId()); + assertEquals(1, result.getTxnToWriteIds().get(0).getWriteId()); + assertEquals(i * numTxns, result.getTxnToWriteIds().get(1).getTxnId()); + assertEquals(2, result.getTxnToWriteIds().get(1).getWriteId()); + } + } + } + private void updateTxns(Connection conn) throws SQLException { Statement stmt = conn.createStatement(); stmt.executeUpdate("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = \"TXN_LAST_HEARTBEAT\" + 1"); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index d59f863b11..d2efc595a5 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -1097,12 +1097,7 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho LOG.warn("markFailed(" + ci.id + "):" + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - try { - checkRetryable(dbConn, e, "markFailed(" + ci + ")"); - } - catch(MetaException ex) { - LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex)); - } + checkRetryable(dbConn, e, "markFailed(" + ci + ")"); LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e); } finally { close(rs, stmt, null); @@ -1132,12 +1127,7 @@ public void setHadoopJobId(String hadoopJobId, long id) { LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - try { - checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")"); - } - catch(MetaException ex) { - LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex)); - } + checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")"); LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e); } finally { close(null, stmt, dbConn); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index cf41ef8aaf..875b938271 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -271,6 +271,8 @@ "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?"; private static final String SELECT_TIMED_OUT_LOCKS_QUERY = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" " + "WHERE \"HL_LAST_HEARTBEAT\" < %s - ? AND \"HL_TXNID\" = 0"; + private static final String WRITE_ID_INSERT_QUERY = "INSERT INTO \"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", " + + "\"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\") VALUES (?, ?, ?, ?)"; private List transactionalListeners; @@ -1649,7 +1651,6 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx PreparedStatement pStmt = null; List insertPreparedStmts = null; ResultSet rs = null; - TxnStore.MutexAPI.LockHandle handle = null; List params = Arrays.asList(dbName, tblName); try { lockInternal(); @@ -1701,7 +1702,6 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx int numAborts = abortTxns(dbConn, txnIds, false); assert(numAborts == numAbortedWrites); } - handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); // There are some txns in the list which has no write id allocated and hence go ahead and do it. // Get the next write id for the given table and update it with new next write id. @@ -1723,7 +1723,7 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")"); + checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")", true); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -1734,9 +1734,6 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx } closeStmt(pStmt); close(rs, stmt, dbConn); - if (handle != null) { - handle.releaseLocks(); - } unlockInternal(); } } catch (RetryException e) { @@ -1759,13 +1756,10 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx } private List getAbortedWriteIds(ValidWriteIdList validWriteIdList) { - List abortedWriteIds = new ArrayList<>(); - for (long writeId : validWriteIdList.getInvalidWriteIds()) { - if (validWriteIdList.isWriteIdAborted(writeId)) { - abortedWriteIds.add(writeId); - } - } - return abortedWriteIds; + return Arrays.stream(validWriteIdList.getInvalidWriteIds()) + .filter(validWriteIdList::isWriteIdAborted) + .boxed() + .collect(Collectors.toList()); } private ValidTxnList getValidTxnList(Connection dbConn, String fullTableName, Long writeId) throws MetaException, @@ -1955,17 +1949,13 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds String tblName = rqst.getTableName().toLowerCase(); try { Connection dbConn = null; - Statement stmt = null; PreparedStatement pStmt = null; - List insertPreparedStmts = null; ResultSet rs = null; - TxnStore.MutexAPI.LockHandle handle = null; List txnToWriteIds = new ArrayList<>(); List srcTxnToWriteIds = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); if (rqst.isSetReplPolicy()) { srcTxnToWriteIds = rqst.getSrcTxnToWriteIdList(); @@ -1974,7 +1964,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds assert (!rqst.isSetTxnIds()); assert (!srcTxnToWriteIds.isEmpty()); - for (TxnToWriteId txnToWriteId : srcTxnToWriteIds) { + for (TxnToWriteId txnToWriteId : srcTxnToWriteIds) { srcTxnIds.add(txnToWriteId.getTxnId()); } txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, dbConn); @@ -1992,21 +1982,23 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds } //Easiest check since we can't differentiate do we handle singleton list or list with multiple txn ids. - if(txnIds.size() > 1) { + if (txnIds.size() > 1) { Collections.sort(txnIds); //easier to read logs and for assumption done in replication flow } // Check if all the input txns are in valid state. // Write IDs should be allocated only for open and not read-only transactions. - if (!isTxnsOpenAndNotReadOnly(txnIds, stmt)) { - String errorMsg = "Write ID allocation on " + TableName.getDbTable(dbName, tblName) - + " failed for input txns: " - + getAbortedAndReadOnlyTxns(txnIds, stmt) - + getCommittedTxns(txnIds, stmt); - LOG.error(errorMsg); - - throw new IllegalStateException("Write ID allocation failed on " + TableName.getDbTable(dbName, tblName) - + " as not all input txns in open state or read-only"); + try (Statement stmt = dbConn.createStatement()) { + if (!isTxnsOpenAndNotReadOnly(txnIds, stmt)) { + String errorMsg = "Write ID allocation on " + TableName.getDbTable(dbName, tblName) + + " failed for input txns: " + + getAbortedAndReadOnlyTxns(txnIds, stmt) + + getCommittedTxns(txnIds, stmt); + LOG.error(errorMsg); + + throw new IllegalStateException("Write ID allocation failed on " + TableName.getDbTable(dbName, tblName) + + " as not all input txns in open state or read-only"); + } } List queries = new ArrayList<>(); @@ -2017,14 +2009,13 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds // write id for the same db.table. If yes, then need to reuse it else have to allocate new one // The write id would have been already allocated in case of multi-statement txns where // first write on a table will allocate write id and rest of the writes should re-use it. - prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE" - + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ?" + " AND "); + prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE") + .append(" \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnIds, "\"T2W_TXNID\"", false, false); long allocatedTxnsCount = 0; - long txnId; - long writeId = 0; + long writeId; List params = Arrays.asList(dbName, tblName); for (String query : queries) { pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, params); @@ -2033,7 +2024,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds rs = pStmt.executeQuery(); while (rs.next()) { // If table write ID is already allocated for the given transaction, then just use it - txnId = rs.getLong(1); + long txnId = rs.getLong(1); writeId = rs.getLong(2); txnToWriteIds.add(new TxnToWriteId(txnId, writeId)); allocatedTxnsCount++; @@ -2054,17 +2045,17 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds long srcWriteId = 0; if (rqst.isSetReplPolicy()) { // In replication flow, we always need to allocate write ID equal to that of source. - assert(srcTxnToWriteIds != null); + assert (srcTxnToWriteIds != null); srcWriteId = srcTxnToWriteIds.get(0).getWriteId(); } - handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); // There are some txns in the list which does not have write id allocated and hence go ahead and do it. // Get the next write id for the given table and update it with new next write id. // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID String s = sqlGenerator.addForUpdateClause( - "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?"); + "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" " + + "WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?"); closeStmt(pStmt); pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", @@ -2076,7 +2067,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds // For repl flow, we need to force set the incoming write id. writeId = (srcWriteId > 0) ? srcWriteId : 1; s = "INSERT INTO \"NEXT_WRITE_ID\" (\"NWI_DATABASE\", \"NWI_TABLE\", \"NWI_NEXT\") VALUES (?, ?, " - + Long.toString(writeId + numOfWriteIds) + ")"; + + (writeId + numOfWriteIds) + ")"; closeStmt(pStmt); pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">", @@ -2087,7 +2078,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds writeId = (srcWriteId > 0) ? srcWriteId : nextWriteId; // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated - s = "UPDATE \"NEXT_WRITE_ID\" SET \"NWI_NEXT\" = " + Long.toString(writeId + numOfWriteIds) + s = "UPDATE \"NEXT_WRITE_ID\" SET \"NWI_NEXT\" = " + (writeId + numOfWriteIds) + " WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?"; closeStmt(pStmt); pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); @@ -2109,24 +2100,30 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds } } - // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated - // write ids - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - for (long txn : txnIds) { - rows.add(txn + ", ?, ?, " + writeId); - txnToWriteIds.add(new TxnToWriteId(txn, writeId)); - paramsList.add(params); - LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn); - writeId++; - } + // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated write ids + try (PreparedStatement pstmt = dbConn.prepareStatement(WRITE_ID_INSERT_QUERY)) { + int insertCounter = 0; + for (long txnId : txnIds) { + pstmt.setLong(1, txnId); + pstmt.setString(2, dbName); + pstmt.setString(3, tblName); + pstmt.setLong(4, writeId); + pstmt.addBatch(); - // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", \"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\")", rows, - paramsList); - for (PreparedStatement pst : insertPreparedStmts) { - pst.execute(); + txnToWriteIds.add(new TxnToWriteId(txnId, writeId)); + LOG.info("Allocated writeID: " + writeId + " for txnId: " + txnId); + writeId++; + insertCounter++; + if (insertCounter % maxBatchSize == 0) { + LOG.debug("Executing a batch of <" + WRITE_ID_INSERT_QUERY + "> queries. Batch size: " + maxBatchSize); + pstmt.executeBatch(); + } + } + if (insertCounter % maxBatchSize != 0) { + LOG.debug("Executing a batch of <" + WRITE_ID_INSERT_QUERY + "> queries. " + + "Batch size: " + insertCounter % maxBatchSize); + pstmt.executeBatch(); + } } if (transactionalListeners != null) { @@ -2136,27 +2133,17 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds dbConn, sqlGenerator); } - LOG.info("Allocated write ids for the table: " + dbName + "." + tblName); - LOG.debug("Going to commit"); + LOG.info("Allocated writeIDs for dbName={}, tblName={} (txnIds: {})", dbName, tblName, rqst.getTxnIds()); dbConn.commit(); return new AllocateTableWriteIdsResponse(txnToWriteIds); } catch (SQLException e) { - LOG.debug("Going to rollback"); + LOG.error("Exception during writeID allocation for request={}. Will retry if possible.", rqst, e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")"); + checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")", true); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - if (insertPreparedStmts != null) { - for (PreparedStatement pst : insertPreparedStmts) { - closeStmt(pst); - } - } - closeStmt(pStmt); - close(rs, stmt, dbConn); - if(handle != null) { - handle.releaseLocks(); - } + close(rs, pStmt, dbConn); unlockInternal(); } } catch (RetryException e) { @@ -2169,12 +2156,10 @@ public void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst) try { Connection dbConn = null; PreparedStatement pst = null; - TxnStore.MutexAPI.LockHandle handle = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); //since this is on conversion from non-acid to acid, NEXT_WRITE_ID should not have an entry //for this table. It also has a unique index in case 'should not' is violated @@ -2197,9 +2182,6 @@ public void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst) + StringUtils.stringifyException(e)); } finally { close(null, pst, dbConn); - if(handle != null) { - handle.releaseLocks(); - } unlockInternal(); } } catch (RetryException e) { @@ -4106,6 +4088,16 @@ private boolean waitForRetry(String caller, String errMsg) { } return false; } + + /** + * See {@link #checkRetryable(Connection, SQLException, String, boolean)}. + */ + protected void checkRetryable(Connection conn, + SQLException e, + String caller) throws RetryException { + checkRetryable(conn, e, caller, false); + } + /** * Determine if an exception was such that it makes sense to retry. Unfortunately there is no standard way to do * this, so we have to inspect the error messages and catch the telltale signs for each @@ -4114,11 +4106,13 @@ private boolean waitForRetry(String caller, String errMsg) { * @param conn database connection * @param e exception that was thrown. * @param caller name of the method calling this (and other info useful to log) + * @param retryOnDuplicateKey whether to retry on unique key constraint violation * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.RetryException when the operation should be retried */ protected void checkRetryable(Connection conn, SQLException e, - String caller) throws RetryException, MetaException { + String caller, + boolean retryOnDuplicateKey) throws RetryException { // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected() // to test these changes. @@ -4150,6 +4144,8 @@ protected void checkRetryable(Connection conn, } else if (isRetryable(conf, e)) { //in MSSQL this means Communication Link Failure sendRetrySignal = waitForRetry(caller, e.getMessage()); + } else if (retryOnDuplicateKey && isDuplicateKeyError(e)) { + sendRetrySignal = waitForRetry(caller, e.getMessage()); } else { //make sure we know we saw an error that we don't recognize @@ -5210,7 +5206,7 @@ private boolean isDuplicateKeyError(SQLException ex) { break; case SQLSERVER: //2627 is unique constaint violation incl PK, 2601 - unique key - if(ex.getErrorCode() == 2627 && "23000".equals(ex.getSQLState())) { + if ((ex.getErrorCode() == 2627 || ex.getErrorCode() == 2601) && "23000".equals(ex.getSQLState())) { return true; } break; diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 1e177f4a7b..0825b40461 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -46,7 +46,7 @@ enum MUTEX_KEY { Initiator, Cleaner, HouseKeeper, CheckLock, TxnCleaner, - CompactionScheduler, WriteIdAllocator, MaterializationRebuild + CompactionScheduler, MaterializationRebuild } // Compactor states (Should really be enum) String INITIATED_RESPONSE = "initiated";