diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index bb29410e7d..f9e28b9bad 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -51,7 +51,7 @@ */ public final class TxnDbUtil { - private static final Logger LOG = LoggerFactory.getLogger(TxnDbUtil.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(TxnDbUtil.class.getName()); private static final String TXN_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; private static final EnumMap DB_EPOCH_FN = @@ -621,6 +621,7 @@ static void closeResources(Connection conn, Statement stmt, ResultSet rs) { /** * Get database specific function which returns the milliseconds value after the epoch. + * @param dbProduct The type of the db which is used * @throws MetaException For unknown database type. */ static String getEpochFn(DatabaseProduct dbProduct) throws MetaException { @@ -634,6 +635,44 @@ static String getEpochFn(DatabaseProduct dbProduct) throws MetaException { } } + /** + * Calls queries in batch, but did not return affected row numbers. Same as executeQueriesInBatch, + * with the only difference when the db is Oracle. In this case it is called as an anonymous stored + * procedure instead of batching, since batching is not optimized. See: + * https://docs.oracle.com/cd/E11882_01/java.112/e16548/oraperf.htm#JJDBC28752 + * @param dbProduct The type of the db which is used + * @param stmt Statement which will be used for batching and execution. + * @param queries List of sql queries to execute in a Statement batch. + * @param batchSize maximum number of queries in a single batch + * @throws SQLException Thrown if an execution error occurs. + */ + static void executeQueriesInBatchNoCount(DatabaseProduct dbProduct, Statement stmt, List queries, int batchSize) throws SQLException { + if (dbProduct == ORACLE) { + int queryCounter = 0; + StringBuilder sb = new StringBuilder(); + sb.append("begin "); + for (String query : queries) { + LOG.debug("Adding query to batch: <" + query + ">"); + queryCounter++; + sb.append(query).append(";"); + if (queryCounter % batchSize == 0) { + sb.append("end;"); + String batch = sb.toString(); + LOG.debug("Going to execute queries in oracle anonymous statement. " + batch); + stmt.execute(batch); + } + } + if (queryCounter % batchSize != 0) { + sb.append("end;"); + String batch = sb.toString(); + LOG.debug("Going to execute queries in oracle anonymous statement. " + batch); + stmt.execute(batch); + } + } else { + executeQueriesInBatch(stmt, queries, batchSize); + } + } + /** * @param stmt Statement which will be used for batching and execution. * @param queries List of sql queries to execute in a Statement batch. diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index d080df417b..d55df6e773 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -94,6 +94,7 @@ import static org.apache.hadoop.hive.metastore.DatabaseProduct.MYSQL; import static org.apache.hadoop.hive.metastore.txn.TxnDbUtil.executeQueriesInBatch; +import static org.apache.hadoop.hive.metastore.txn.TxnDbUtil.executeQueriesInBatchNoCount; import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; @@ -1195,7 +1196,7 @@ public void commitTxn(CommitTxnRequest rqst) dbConn.rollback(undoWriteSetForCurrentTxn); LOG.info(msg); //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this - if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) { + if (abortTxns(dbConn, Collections.singletonList(txnid), false) != 1) { throw new IllegalStateException(msg + " FAILED!"); } dbConn.commit(); @@ -1345,7 +1346,7 @@ private void updateCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnTyp queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); // execute all in one batch - executeQueriesInBatch(stmt, queryBatch, maxBatchSize); + executeQueriesInBatchNoCount(dbProduct, stmt, queryBatch, maxBatchSize); } private void updateKeyValueAssociatedWithTxn(CommitTxnRequest rqst, Statement stmt) throws SQLException { @@ -1446,7 +1447,7 @@ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaEx } // Abort all the allocated txns so that the mapped write ids are referred as aborted ones. - int numAborts = abortTxns(dbConn, txnIds, true); + int numAborts = abortTxns(dbConn, txnIds, false); assert(numAborts == numAbortedWrites); } handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); @@ -4193,12 +4194,12 @@ private void checkQFileTestHack() { } } - private int abortTxns(Connection dbConn, List txnids, boolean isStrict) throws SQLException, MetaException { - return abortTxns(dbConn, txnids, false, isStrict); + private int abortTxns(Connection dbConn, List txnids, boolean skipCount) throws SQLException, MetaException { + return abortTxns(dbConn, txnids, false, skipCount); } /** * TODO: expose this as an operation to client. Useful for streaming API to abort all remaining - * trasnactions in a batch on IOExceptions. + * transactions in a batch on IOExceptions. * Caller must rollback the transaction if not all transactions were aborted since this will not * attempt to delete associated locks in this case. * @@ -4206,14 +4207,11 @@ private int abortTxns(Connection dbConn, List txnids, boolean isStrict) th * @param txnids list of transactions to abort * @param checkHeartbeat value used by {@link #performTimeOuts()} to ensure this doesn't Abort txn which were * heartbeated after #performTimeOuts() select and this operation. - * @param isStrict true for strict mode, false for best-effort mode. - * In strict mode, if all txns are not successfully aborted, then the count of - * updated ones will be returned and the caller will roll back. - * In best-effort mode, we will ignore that fact and continue deleting the locks. - * @return Number of aborted transactions + * @param skipCount If true, the method always returns 0, otherwise returns the number of actually aborted txns + * @return 0 if skipCount is true, the number of aborted transactions otherwise * @throws SQLException */ - private int abortTxns(Connection dbConn, List txnids, boolean checkHeartbeat, boolean isStrict) + private int abortTxns(Connection dbConn, List txnids, boolean checkHeartbeat, boolean skipCount) throws SQLException, MetaException { Statement stmt = null; if (txnids.isEmpty()) { @@ -4244,8 +4242,13 @@ private int abortTxns(Connection dbConn, List txnids, boolean checkHeartbe TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false); // execute all queries in the list in one batch - List affectedRowsByQuery = executeQueriesInBatch(stmt, queries, maxBatchSize); - return getUpdateCount(numUpdateQueries, affectedRowsByQuery); + if (skipCount) { + executeQueriesInBatchNoCount(dbProduct, stmt, queries, maxBatchSize); + return 0; + } else { + List affectedRowsByQuery = executeQueriesInBatch(stmt, queries, maxBatchSize); + return getUpdateCount(numUpdateQueries, affectedRowsByQuery); + } } finally { closeStmt(stmt); } @@ -4347,7 +4350,7 @@ private LockResponse checkLock(Connection dbConn, long extLockId, long txnId) " since a concurrent committed transaction [" + JavaUtils.txnIdToString(rs.getLong(4)) + "," + rs.getLong(5) + "] has already updated resource '" + resourceName + "'"; LOG.info(msg); - if(abortTxns(dbConn, Collections.singletonList(writeSet.get(0).txnId), true) != 1) { + if (abortTxns(dbConn, Collections.singletonList(writeSet.get(0).txnId), false) != 1) { throw new IllegalStateException(msg + " FAILED!"); } dbConn.commit(); @@ -4875,7 +4878,7 @@ public void performTimeOuts() { close(rs, stmt, null); int numTxnsAborted = 0; for(List batchToAbort : timedOutTxns) { - if(abortTxns(dbConn, batchToAbort, true, true) == batchToAbort.size()) { + if(abortTxns(dbConn, batchToAbort, true, false) == batchToAbort.size()) { dbConn.commit(); numTxnsAborted += batchToAbort.size(); //todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout'