diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 3bfb0e69cb..f6097f7520 100644 --- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -644,6 +644,12 @@ public static ConfVars getMetaConf(String name) { "However, it doesn't work correctly with integral values that are not normalized (e.g. have\n" + "leading zeroes, like 0012). If metastore direct SQL is enabled and works, this optimization\n" + "is also irrelevant."), + // Once exceeded, the queries should be broken into separate batches. + // Note: This value is not passed into the JDBC driver, therefore this batch size limit is not automatically enforced. + // Batch construction/splits should be done manually in code using this config value. + JDBC_MAX_BATCH_SIZE("metastore.jdbc.max.batch.size", "hive.metastore.jdbc.max.batch.size", + 1000, new RangeValidator(1, null), + "Maximum number of update/delete/insert queries in a single JDBC batch statement (including Statement/PreparedStatement)."), KERBEROS_KEYTAB_FILE("metastore.kerberos.keytab.file", "hive.metastore.kerberos.keytab.file", "", "The path to the Kerberos Keytab file containing the metastore Thrift server's service principal."), diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index a66e16973f..bb29410e7d 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -637,14 +637,13 @@ static String getEpochFn(DatabaseProduct dbProduct) throws MetaException { /** * @param stmt Statement which will be used for batching and execution. * @param queries List of sql queries to execute in a Statement batch. - * @param conf Configuration for retrieving max batch size param + * @param batchSize maximum number of queries in a single batch * @return A list with the number of rows affected by each query in queries. * @throws SQLException Thrown if an execution error occurs. */ - static List executeQueriesInBatch(Statement stmt, List queries, Configuration conf) throws SQLException { + static List executeQueriesInBatch(Statement stmt, List queries, int batchSize) throws SQLException { List affectedRowsByQuery = new ArrayList<>(); int queryCounter = 0; - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); for (String query : queries) { LOG.debug("Adding query to batch: <" + query + ">"); queryCounter++; diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 331fd4cc8d..7d65c34c4e 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -268,6 +268,7 @@ char getSqlConst() { // (End user) Transaction timeout, in milliseconds. private long timeout; + private int maxBatchSize; private String identifierQuoteString; // quotes to use for quoting tables, where necessary private long retryInterval; private int retryLimit; @@ -345,6 +346,7 @@ public void setConf(Configuration conf) { retryLimit = MetastoreConf.getIntVar(conf, ConfVars.HMS_HANDLER_ATTEMPTS); deadlockRetryInterval = retryInterval / 10; maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS); + maxBatchSize = MetastoreConf.getIntVar(conf, ConfVars.JDBC_MAX_BATCH_SIZE); try { transactionalListeners = MetaStoreServerUtils.getMetaStoreListeners( @@ -1263,7 +1265,6 @@ public void commitTxn(CommitTxnRequest rqst) String sql = String.format(COMPL_TXN_COMPONENTS_INSERT_QUERY, txnid, quoteChar(isUpdateDelete)); try (PreparedStatement pstmt = dbConn.prepareStatement(sql)) { int insertCounter = 0; - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) { pstmt.setString(1, writeEventInfo.getDatabase()); pstmt.setString(2, writeEventInfo.getTable()); @@ -1272,13 +1273,13 @@ public void commitTxn(CommitTxnRequest rqst) pstmt.addBatch(); insertCounter++; - if (insertCounter % batchSize == 0) { - LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + batchSize); + if (insertCounter % maxBatchSize == 0) { + LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + maxBatchSize); pstmt.executeBatch(); } } - if (insertCounter % batchSize != 0) { - LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + insertCounter % batchSize); + if (insertCounter % maxBatchSize != 0) { + LOG.debug("Executing a batch of <" + sql + "> queries. Batch size: " + insertCounter % maxBatchSize); pstmt.executeBatch(); } } @@ -1347,7 +1348,7 @@ private void cleanUpTxnRelatedMetadata(long txnid, Statement stmt) throws SQLExc "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid, "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid, "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); - executeQueriesInBatch(stmt, queries, conf); + executeQueriesInBatch(stmt, queries, maxBatchSize); } /** @@ -2433,7 +2434,6 @@ private void insertTxnComponents(long txnid, LockRequest rqst, Connection dbConn // For each component in this lock request, // add an entry to the txn_components table int insertCounter = 0; - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); for (LockComponent lc : rqst.getComponent()) { if (lc.isSetIsTransactional() && !lc.isIsTransactional()) { //we don't prevent using non-acid resources in a txn but we do lock them @@ -2456,13 +2456,13 @@ private void insertTxnComponents(long txnid, LockRequest rqst, Connection dbConn pstmt.addBatch(); insertCounter++; - if (insertCounter % batchSize == 0) { - LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + batchSize); + if (insertCounter % maxBatchSize == 0) { + LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + maxBatchSize); pstmt.executeBatch(); } } - if (insertCounter % batchSize != 0) { - LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % batchSize); + if (insertCounter % maxBatchSize != 0) { + LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % maxBatchSize); pstmt.executeBatch(); } } @@ -2545,7 +2545,6 @@ private void insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct); String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB); - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); long intLockId = 0; try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) { @@ -2577,13 +2576,13 @@ private void insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn pstmt.setString(11, rqst.getAgentInfo()); pstmt.addBatch(); - if (intLockId % batchSize == 0) { - LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + batchSize); + if (intLockId % maxBatchSize == 0) { + LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + maxBatchSize); pstmt.executeBatch(); } } - if (intLockId % batchSize != 0) { - LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + intLockId % batchSize); + if (intLockId % maxBatchSize != 0) { + LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + intLockId % maxBatchSize); pstmt.executeBatch(); } } @@ -3272,7 +3271,6 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) Long writeId = rqst.getWriteid(); try (PreparedStatement pstmt = dbConn.prepareStatement(TXN_COMPONENTS_INSERT_QUERY)) { int insertCounter = 0; - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); for (String partName : rqst.getPartitionnames()) { pstmt.setLong(1, rqst.getTxnid()); pstmt.setString(2, normalizeCase(rqst.getDbname())); @@ -3283,13 +3281,13 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) pstmt.addBatch(); insertCounter++; - if (insertCounter % batchSize == 0) { - LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + batchSize); + if (insertCounter % maxBatchSize == 0) { + LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + maxBatchSize); pstmt.executeBatch(); } } - if (insertCounter % batchSize != 0) { - LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % batchSize); + if (insertCounter % maxBatchSize != 0) { + LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % maxBatchSize); pstmt.executeBatch(); } } @@ -4219,7 +4217,7 @@ private int abortTxns(Connection dbConn, List txnids, boolean checkHeartbe TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false); // execute all queries in the list in one batch - List affectedRowsByQuery = executeQueriesInBatch(stmt, queries, conf); + List affectedRowsByQuery = executeQueriesInBatch(stmt, queries, maxBatchSize); return getUpdateCount(numUpdateQueries, affectedRowsByQuery); } finally { closeStmt(stmt);