diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 8fded608d0..faf99317ab 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -271,6 +271,8 @@ "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?"; private static final String SELECT_TIMED_OUT_LOCKS_QUERY = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" " + "WHERE \"HL_LAST_HEARTBEAT\" < %s - ? AND \"HL_TXNID\" = 0"; + private static final String WRITE_ID_INSERT_QUERY = "INSERT INTO \"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", " + + "\"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\") VALUES (?, ?, ?, ?)"; private List transactionalListeners; @@ -1953,17 +1955,13 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds String tblName = rqst.getTableName().toLowerCase(); try { Connection dbConn = null; - Statement stmt = null; PreparedStatement pStmt = null; - List insertPreparedStmts = null; ResultSet rs = null; - TxnStore.MutexAPI.LockHandle handle = null; List txnToWriteIds = new ArrayList<>(); List srcTxnToWriteIds = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); if (rqst.isSetReplPolicy()) { srcTxnToWriteIds = rqst.getSrcTxnToWriteIdList(); @@ -1972,7 +1970,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds assert (!rqst.isSetTxnIds()); assert (!srcTxnToWriteIds.isEmpty()); - for (TxnToWriteId txnToWriteId : srcTxnToWriteIds) { + for (TxnToWriteId txnToWriteId : srcTxnToWriteIds) { srcTxnIds.add(txnToWriteId.getTxnId()); } txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, dbConn); @@ -1990,21 +1988,23 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds } //Easiest check since we can't differentiate do we handle singleton list or list with multiple txn ids. - if(txnIds.size() > 1) { + if (txnIds.size() > 1) { Collections.sort(txnIds); //easier to read logs and for assumption done in replication flow } // Check if all the input txns are in valid state. // Write IDs should be allocated only for open and not read-only transactions. - if (!isTxnsOpenAndNotReadOnly(txnIds, stmt)) { - String errorMsg = "Write ID allocation on " + TableName.getDbTable(dbName, tblName) - + " failed for input txns: " - + getAbortedAndReadOnlyTxns(txnIds, stmt) - + getCommittedTxns(txnIds, stmt); - LOG.error(errorMsg); - - throw new IllegalStateException("Write ID allocation failed on " + TableName.getDbTable(dbName, tblName) - + " as not all input txns in open state or read-only"); + try (Statement stmt = dbConn.createStatement()) { + if (!isTxnsOpenAndNotReadOnly(txnIds, stmt)) { + String errorMsg = "Write ID allocation on " + TableName.getDbTable(dbName, tblName) + + " failed for input txns: " + + getAbortedAndReadOnlyTxns(txnIds, stmt) + + getCommittedTxns(txnIds, stmt); + LOG.error(errorMsg); + + throw new IllegalStateException("Write ID allocation failed on " + TableName.getDbTable(dbName, tblName) + + " as not all input txns in open state or read-only"); + } } List queries = new ArrayList<>(); @@ -2015,14 +2015,13 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds // write id for the same db.table. If yes, then need to reuse it else have to allocate new one // The write id would have been already allocated in case of multi-statement txns where // first write on a table will allocate write id and rest of the writes should re-use it. - prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE" - + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ?" + " AND "); + prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE") + .append(" \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnIds, "\"T2W_TXNID\"", false, false); long allocatedTxnsCount = 0; - long txnId; - long writeId = 0; + long writeId; List params = Arrays.asList(dbName, tblName); for (String query : queries) { pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, params); @@ -2031,7 +2030,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds rs = pStmt.executeQuery(); while (rs.next()) { // If table write ID is already allocated for the given transaction, then just use it - txnId = rs.getLong(1); + long txnId = rs.getLong(1); writeId = rs.getLong(2); txnToWriteIds.add(new TxnToWriteId(txnId, writeId)); allocatedTxnsCount++; @@ -2052,17 +2051,17 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds long srcWriteId = 0; if (rqst.isSetReplPolicy()) { // In replication flow, we always need to allocate write ID equal to that of source. - assert(srcTxnToWriteIds != null); + assert (srcTxnToWriteIds != null); srcWriteId = srcTxnToWriteIds.get(0).getWriteId(); } - handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); // There are some txns in the list which does not have write id allocated and hence go ahead and do it. // Get the next write id for the given table and update it with new next write id. // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID String s = sqlGenerator.addForUpdateClause( - "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?"); + "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" " + + "WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?"); closeStmt(pStmt); pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", @@ -2074,7 +2073,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds // For repl flow, we need to force set the incoming write id. writeId = (srcWriteId > 0) ? srcWriteId : 1; s = "INSERT INTO \"NEXT_WRITE_ID\" (\"NWI_DATABASE\", \"NWI_TABLE\", \"NWI_NEXT\") VALUES (?, ?, " - + Long.toString(writeId + numOfWriteIds) + ")"; + + (writeId + numOfWriteIds) + ")"; closeStmt(pStmt); pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">", @@ -2085,7 +2084,7 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds writeId = (srcWriteId > 0) ? srcWriteId : nextWriteId; // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated - s = "UPDATE \"NEXT_WRITE_ID\" SET \"NWI_NEXT\" = " + Long.toString(writeId + numOfWriteIds) + s = "UPDATE \"NEXT_WRITE_ID\" SET \"NWI_NEXT\" = " + (writeId + numOfWriteIds) + " WHERE \"NWI_DATABASE\" = ? AND \"NWI_TABLE\" = ?"; closeStmt(pStmt); pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params); @@ -2107,24 +2106,29 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds } } - // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated - // write ids - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - for (long txn : txnIds) { - rows.add(txn + ", ?, ?, " + writeId); - txnToWriteIds.add(new TxnToWriteId(txn, writeId)); - paramsList.add(params); - LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn); - writeId++; - } + // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated write ids + try (PreparedStatement pstmt = dbConn.prepareStatement(WRITE_ID_INSERT_QUERY)) { + int insertCounter = 0; + for (long txnId : txnIds) { + pstmt.setLong(1, txnId); + pstmt.setString(2, dbName); + pstmt.setString(3, tblName); + pstmt.setLong(4, writeId); + pstmt.addBatch(); - // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", \"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\")", rows, - paramsList); - for (PreparedStatement pst : insertPreparedStmts) { - pst.execute(); + txnToWriteIds.add(new TxnToWriteId(txnId, writeId)); + LOG.info("Allocated writeID: " + writeId + " for txnId: " + txnId); + writeId++; + insertCounter++; + if (insertCounter % maxBatchSize == 0) { + LOG.debug("Executing a batch of <" + WRITE_ID_INSERT_QUERY + "> queries. Batch size: " + maxBatchSize); + pstmt.executeBatch(); + } + } + if (insertCounter % maxBatchSize != 0) { + LOG.debug("Executing a batch of <" + WRITE_ID_INSERT_QUERY + "> queries. Batch size: " + insertCounter % maxBatchSize); + pstmt.executeBatch(); + } } if (transactionalListeners != null) { @@ -2135,26 +2139,23 @@ public AllocateTableWriteIdsResponse allocateTableWriteIds(AllocateTableWriteIds } LOG.info("Allocated write ids for the table: " + dbName + "." + tblName); - LOG.debug("Going to commit"); dbConn.commit(); return new AllocateTableWriteIdsResponse(txnToWriteIds); } catch (SQLException e) { - LOG.debug("Going to rollback"); + LOG.error("Exception during write id allocation for request={}.", rqst, e); rollbackDBConn(dbConn); + if (isDuplicateKeyError(e)) { + LOG.warn("Detected concurrent insert of same db/table pair into next_write_id for dbName={}, tblName={}. " + + "Will retry if possible.", dbName, tblName); + if (waitForRetry("allocateTableWriteIds(" + rqst + ")", e.getMessage())) { + throw new RetryException(); + } + } checkRetryable(dbConn, e, "allocateTableWriteIds(" + rqst + ")"); throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - if (insertPreparedStmts != null) { - for (PreparedStatement pst : insertPreparedStmts) { - closeStmt(pst); - } - } - closeStmt(pStmt); - close(rs, stmt, dbConn); - if(handle != null) { - handle.releaseLocks(); - } + close(rs, pStmt, dbConn); unlockInternal(); } } catch (RetryException e) {