diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index d59f863b11..d2efc595a5 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -1097,12 +1097,7 @@ public void markFailed(CompactionInfo ci) throws MetaException {//todo: this sho LOG.warn("markFailed(" + ci.id + "):" + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - try { - checkRetryable(dbConn, e, "markFailed(" + ci + ")"); - } - catch(MetaException ex) { - LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex)); - } + checkRetryable(dbConn, e, "markFailed(" + ci + ")"); LOG.error("markFailed(" + ci + ") failed: " + e.getMessage(), e); } finally { close(rs, stmt, null); @@ -1132,12 +1127,7 @@ public void setHadoopJobId(String hadoopJobId, long id) { LOG.warn("setHadoopJobId(" + hadoopJobId + "," + id + "):" + e.getMessage()); LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - try { - checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")"); - } - catch(MetaException ex) { - LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(ex)); - } + checkRetryable(dbConn, e, "setHadoopJobId(" + hadoopJobId + "," + id + ")"); LOG.error("setHadoopJobId(" + hadoopJobId + "," + id + ") failed: " + e.getMessage(), e); } finally { close(null, stmt, dbConn); diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 8fded608d0..0c279b0334 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -271,6 +271,8 @@ "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?"; private static final String SELECT_TIMED_OUT_LOCKS_QUERY = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" " + "WHERE \"HL_LAST_HEARTBEAT\" < %s - ? AND \"HL_TXNID\" = 0"; + private static final String WRITE_ID_INSERT_QUERY = "INSERT INTO \"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", " + + "\"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\") VALUES (?, ?, ?, ?)"; private List transactionalListeners; @@ -1647,7 +1649,6 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx PreparedStatement pStmt = null; List insertPreparedStmts = null; ResultSet rs = null; - TxnStore.MutexAPI.LockHandle handle = null; List params = Arrays.asList(dbName, tblName); try { lockInternal(); @@ -1699,7 +1700,6 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx int numAborts = abortTxns(dbConn, txnIds, false); assert(numAborts == numAbortedWrites); } - handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); // There are some txns in the list which has no write id allocated and hence go ahead and do it. // Get the next write id for the given table and update it with new next write id. @@ -1721,7 +1721,7 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx } catch (SQLException e) { LOG.debug("Going to rollback"); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")"); + checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")", true); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { @@ -1732,9 +1732,6 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx } closeStmt(pStmt); close(rs, stmt, dbConn); - if (handle != null) { - handle.releaseLocks(); - } unlockInternal(); } } catch (RetryException e) { @@ -1757,13 +1754,10 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx } private List getAbortedWriteIds(ValidWriteIdList validWriteIdList) { - List abortedWriteIds = new ArrayList<>(); - for (long writeId : validWriteIdList.getInvalidWriteIds()) { - if (validWriteIdList.isWriteIdAborted(writeId)) { - abortedWriteIds.add(writeId); - } - } - return abortedWriteIds; + return Arrays.stream(validWriteIdList.getInvalidWriteIds()) + .filter(validWriteIdList::isWriteIdAborted) + .boxed() + .collect(Collectors.toList()); } private ValidTxnList getValidTxnList(Connection dbConn, String fullTableName, Long writeId) throws MetaException, @@ -1953,17 +1947,13 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds String tblName = rqst.getTableName().toLowerCase(); try { Connection dbConn = null; - Statement stmt = null; PreparedStatement pStmt = null; - List insertPreparedStmts = null; ResultSet rs = null; - TxnStore.MutexAPI.LockHandle handle = null; List txnToWriteIds = new ArrayList<>(); List srcTxnToWriteIds = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); if (rqst.isSetReplPolicy()) { srcTxnToWriteIds = rqst.getSrcTxnToWriteIdList(); @@ -1972,7 +1962,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds assert (!rqst.isSetTxnIds()); assert (!srcTxnToWriteIds.isEmpty()); - for (TxnToWriteId txnToWriteId : srcTxnToWriteIds) { + for (TxnToWriteId txnToWriteId : srcTxnToWriteIds) { srcTxnIds.add(txnToWriteId.getTxnId()); } txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, dbConn); @@ -1990,21 +1980,23 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds } //Easiest check since we can't differentiate do we handle singleton list or list with multiple txn ids. - if(txnIds.size() > 1) { + if (txnIds.size() > 1) { Collections.sort(txnIds); //easier to read logs and for assumption done in replication flow } // Check if all the input txns are in valid state. // Write IDs should be allocated only for open and not read-only transactions. - if (!isTxnsOpenAndNotReadOnly(txnIds, stmt)) { - String errorMsg = "Write ID allocation on " + TableName.getDbTable(dbName, tblName) - + " failed for input txns: " - + getAbortedAndReadOnlyTxns(txnIds, stmt) - + getCommittedTxns(txnIds, stmt); - LOG.error(errorMsg); - - throw new IllegalStateException("Write ID allocation failed on " + TableName.getDbTable(dbName, tblName) - + " as not all input txns in open state or read-only"); + try (Statement stmt = dbConn.createStatement()) { + if (!isTxnsOpenAndNotReadOnly(txnIds, stmt)) { + String errorMsg = "Write ID allocation on " + TableName.getDbTable(dbName, tblName) + + " failed for input txns: " + + getAbortedAndReadOnlyTxns(txnIds, stmt) + + getCommittedTxns(txnIds, stmt); + LOG.error(errorMsg); + + throw new IllegalStateException("Write ID allocation failed on " + TableName.getDbTable(dbName, tblName) + + " as not all input txns in open state or read-only"); + } } List queries = new ArrayList<>(); @@ -2015,14 +2007,13 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds // write id for the same db.table. If yes, then need to reuse it else have to allocate new one // The write id would have been already allocated in case of multi-statement txns where // first write on a table will allocate write id and rest of the writes should re-use it. - prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE" - + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ?" + " AND "); + prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE") + .append(" \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnIds, "\"T2W_TXNID\"", false, false); long allocatedTxnsCount = 0; - long txnId; - long writeId = 0; + long writeId; List params = Arrays.asList(dbName, tblName); for (String query : queries) { pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, params); @@ -2031,7 +2022,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds rs = pStmt.executeQuery(); while (rs.next()) { // If table write ID is already allocated for the given transaction, then just use it - txnId = rs.getLong(1); + long txnId = rs.getLong(1); writeId = rs.getLong(2); txnToWriteIds.add(new TxnToWriteId(txnId, writeId)); allocatedTxnsCount++; @@ -2052,17 +2043,17 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds long srcWriteId = 0; if (rqst.isSetReplPolicy()) { // In replication flow, we always need to allocate write ID equal to that of source. - assert(srcTxnToWriteIds != null); + assert (srcTxnToWriteIds != null); srcWriteId = srcTxnToWriteIds.get(0).getWriteId(); } - handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); // There are some txns in the list which does not have write id allocated and hence go ahead and do it. // Get the next write id for the given table and update it with new next write id. // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID String s = sqlGenerator.addForUpdateClause( - "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?"); + "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" " + + "WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?"); closeStmt(pStmt); pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", @@ -2074,7 +2065,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds // For repl flow, we need to force set the incoming write id. writeId = (srcWriteId > 0) ? srcWriteId : 1; s = "INSERT INTO \"NEXT_WRITE_ID\" (\"NWI_DATABASE\", \"NWI_TABLE\", \"NWI_NEXT\") VALUES (?, ?, " - + Long.toString(writeId + numOfWriteIds) + ")"; + + (writeId + numOfWriteIds) + ")"; closeStmt(pStmt); pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">", @@ -2085,7 +2076,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds writeId = (srcWriteId > 0) ? srcWriteId : nextWriteId; // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated - s = "UPDATE \"NEXT_WRITE_ID\" SET \"NWI_NEXT\" = " + Long.toString(writeId + numOfWriteIds) + s = "UPDATE \"NEXT_WRITE_ID\" SET \"NWI_NEXT\" = " + (writeId + numOfWriteIds) + " WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?"; closeStmt(pStmt); pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); @@ -2107,24 +2098,30 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds } } - // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated - // write ids - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - for (long txn : txnIds) { - rows.add(txn + ", ?, ?, " + writeId); - txnToWriteIds.add(new TxnToWriteId(txn, writeId)); - paramsList.add(params); - LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn); - writeId++; - } + // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated write ids + try (PreparedStatement pstmt = dbConn.prepareStatement(WRITE_ID_INSERT_QUERY)) { + int insertCounter = 0; + for (long txnId : txnIds) { + pstmt.setLong(1, txnId); + pstmt.setString(2, dbName); + pstmt.setString(3, tblName); + pstmt.setLong(4, writeId); + pstmt.addBatch(); - // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", \"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\")", rows, - paramsList); - for (PreparedStatement pst : insertPreparedStmts) { - pst.execute(); + txnToWriteIds.add(new TxnToWriteId(txnId, writeId)); + LOG.info("Allocated writeID: " + writeId + " for txnId: " + txnId); + writeId++; + insertCounter++; + if (insertCounter % maxBatchSize == 0) { + LOG.debug("Executing a batch of <" + WRITE_ID_INSERT_QUERY + "> queries. Batch size: " + maxBatchSize); + pstmt.executeBatch(); + } + } + if (insertCounter % maxBatchSize != 0) { + LOG.debug("Executing a batch of <" + WRITE_ID_INSERT_QUERY + "> queries. " + + "Batch size: " + insertCounter % maxBatchSize); + pstmt.executeBatch(); + } } if (transactionalListeners != null) { @@ -2134,27 +2131,17 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds dbConn, sqlGenerator); } - LOG.info("Allocated write ids for the table: " + dbName + "." + tblName); - LOG.debug("Going to commit"); + LOG.info("Allocated writeIDs for dbName={}, tblName={} (txnIds: {})", dbName, tblName, rqst.getTxnIds()); dbConn.commit(); return new AllocateTableWriteIdsResponse(txnToWriteIds); } catch (SQLException e) { - LOG.debug("Going to rollback"); + LOG.error("Exception during writeID allocation for request={}. Will retry if possible.", rqst, e); rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")"); + checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")", true); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - if (insertPreparedStmts != null) { - for (PreparedStatement pst : insertPreparedStmts) { - closeStmt(pst); - } - } - closeStmt(pStmt); - close(rs, stmt, dbConn); - if(handle != null) { - handle.releaseLocks(); - } + close(rs, pStmt, dbConn); unlockInternal(); } } catch (RetryException e) { @@ -2167,12 +2154,10 @@ public void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst) try { Connection dbConn = null; PreparedStatement pst = null; - TxnStore.MutexAPI.LockHandle handle = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); //since this is on conversion from non-acid to acid, NEXT_WRITE_ID should not have an entry //for this table. It also has a unique index in case 'should not' is violated @@ -2195,9 +2180,6 @@ public void seedWriteIdOnAcidConversion(InitializeTableWriteIdsRequest rqst) + StringUtils.stringifyException(e)); } finally { close(null, pst, dbConn); - if(handle != null) { - handle.releaseLocks(); - } unlockInternal(); } } catch (RetryException e) { @@ -4105,6 +4087,16 @@ private boolean waitForRetry(String caller, String errMsg) { } return false; } + + /** + * See {@link #checkRetryable(Connection, SQLException, String, boolean)}. + */ + protected void checkRetryable(Connection conn, + SQLException e, + String caller) throws RetryException { + checkRetryable(conn, e, caller, false); + } + /** * Determine if an exception was such that it makes sense to retry. Unfortunately there is no standard way to do * this, so we have to inspect the error messages and catch the telltale signs for each @@ -4113,11 +4105,13 @@ private boolean waitForRetry(String caller, String errMsg) { * @param conn database connection * @param e exception that was thrown. * @param caller name of the method calling this (and other info useful to log) + * @param retryOnDuplicateKey whether to retry on unique key constraint violation * @throws org.apache.hadoop.hive.metastore.txn.TxnHandler.RetryException when the operation should be retried */ protected void checkRetryable(Connection conn, SQLException e, - String caller) throws RetryException, MetaException { + String caller, + boolean retryOnDuplicateKey) throws RetryException { // If you change this function, remove the @Ignore from TestTxnHandler.deadlockIsDetected() // to test these changes. @@ -4149,6 +4143,8 @@ protected void checkRetryable(Connection conn, } else if (isRetryable(conf, e)) { //in MSSQL this means Communication Link Failure sendRetrySignal = waitForRetry(caller, e.getMessage()); + } else if (retryOnDuplicateKey && isDuplicateKeyError(e)) { + sendRetrySignal = waitForRetry(caller, e.getMessage()); } else { //make sure we know we saw an error that we don't recognize @@ -5209,7 +5205,7 @@ private boolean isDuplicateKeyError(SQLException ex) { break; case SQLSERVER: //2627 is unique constaint violation incl PK, 2601 - unique key - if(ex.getErrorCode() == 2627 && "23000".equals(ex.getSQLState())) { + if ((ex.getErrorCode() == 2627 || ex.getErrorCode() == 2601) && "23000".equals(ex.getSQLState())) { return true; } break;