diff --git standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 3bfb0e69cb..fefd1195f9 100644 --- standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -644,6 +644,11 @@ public static ConfVars getMetaConf(String name) { "However, it doesn't work correctly with integral values that are not normalized (e.g. have\n" + "leading zeroes, like 0012). If metastore direct SQL is enabled and works, this optimization\n" + "is also irrelevant."), + JDBC_MAX_BATCH_SIZE("metastore.jdbc.max.batch.size", "hive.metastore.jdbc.max.batch.size", + 1000, new RangeValidator(1, null), + "Maximum number of update/delete/insert queries in a single JDBC batch statement (including Statement/PreparedStatement). " + + "Once exceeded, the queries should be broken into separate batches.\nNote: this value is not passed into the JDBC driver therefore " + + "the batch size limits are not automatically enforced - batch construction/splits should be done manually using this config value."), KERBEROS_KEYTAB_FILE("metastore.kerberos.keytab.file", "hive.metastore.kerberos.keytab.file", "", "The path to the Kerberos Keytab file containing the metastore Thrift server's service principal."), diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 620c77e589..063d54489a 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -636,14 +636,13 @@ static String getEpochFn(DatabaseProduct dbProduct) throws MetaException { /** * @param stmt Statement which will be used for batching and execution. * @param queries List of sql queries to execute in a Statement batch. - * @param conf Configuration for retrieving max batch size param + * @param batchSize maximum number of queries in a single batch * @return A list with the number of rows affected by each query in queries. * @throws SQLException Thrown if an execution error occurs. */ - static List executeQueriesInBatch(Statement stmt, List queries, Configuration conf) throws SQLException { + static List executeQueriesInBatch(Statement stmt, List queries, int batchSize) throws SQLException { List affectedRowsByQuery = new ArrayList<>(); int queryCounter = 0; - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); for (String query : queries) { LOG.debug("Adding query to batch: <" + query + ">"); queryCounter++; diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 7d0db0c3a0..e79b305569 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -270,6 +270,7 @@ char getSqlConst() { // (End user) Transaction timeout, in milliseconds. private long timeout; + private Integer maxBatchSize = null; private String identifierQuoteString; // quotes to use for quoting tables, where necessary private long retryInterval; private int retryLimit; @@ -1307,7 +1308,7 @@ public void commitTxn(CommitTxnRequest rqst) String sql = String.format(COMPL_TXN_COMPONENTS_INSERT_QUERY, txnid, quoteChar(isUpdateDelete)); try (PreparedStatement pstmt = dbConn.prepareStatement(sql)) { int insertCounter = 0; - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + int batchSize = getMaxBatchSize(); for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) { pstmt.setString(1, writeEventInfo.getDatabase()); pstmt.setString(2, writeEventInfo.getTable()); @@ -1393,7 +1394,7 @@ private void cleanUpTxnRelatedMetadata(long txnid, Statement stmt) throws SQLExc "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid, "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid, "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); - executeQueriesInBatch(stmt, queries, conf); + executeQueriesInBatch(stmt, queries, getMaxBatchSize()); LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); } @@ -2480,7 +2481,7 @@ private void insertTxnComponents(long txnid, LockRequest rqst, Connection dbConn // For each component in this lock request, // add an entry to the txn_components table int insertCounter = 0; - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + int batchSize = getMaxBatchSize(); for (LockComponent lc : rqst.getComponent()) { if (lc.isSetIsTransactional() && !lc.isIsTransactional()) { //we don't prevent using non-acid resources in a txn but we do lock them @@ -2516,6 +2517,13 @@ private void insertTxnComponents(long txnid, LockRequest rqst, Connection dbConn } } + private int getMaxBatchSize() { + if (maxBatchSize == null) { + maxBatchSize = MetastoreConf.getIntVar(conf, ConfVars.JDBC_MAX_BATCH_SIZE); + } + return maxBatchSize; + } + private Optional getWriteId(Map, Optional> writeIdCache, String dbName, String tblName, long txnid, Connection dbConn) throws SQLException { /* we can cache writeIDs based on dbName and tblName because txnid is invariant and partitionName is not part of the writeID select query */ @@ -2592,7 +2600,7 @@ private void insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct); String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB); - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + int batchSize = getMaxBatchSize(); long intLockId = 0; try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) { @@ -3331,7 +3339,7 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) Long writeId = rqst.getWriteid(); try (PreparedStatement pstmt = dbConn.prepareStatement(TXN_COMPONENTS_INSERT_QUERY)) { int insertCounter = 0; - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + int batchSize = getMaxBatchSize(); for (String partName : rqst.getPartitionnames()) { pstmt.setLong(1, rqst.getTxnid()); pstmt.setString(2, normalizeCase(rqst.getDbname())); @@ -4292,7 +4300,7 @@ private int abortTxns(Connection dbConn, List txnids, boolean checkHeartbe TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false); // execute all queries in the list in one batch - List affectedRowsByQuery = executeQueriesInBatch(stmt, queries, conf); + List affectedRowsByQuery = executeQueriesInBatch(stmt, queries, getMaxBatchSize()); LOG.info("Removed aborted transactions: (" + txnids + ") from MIN_HISTORY_LEVEL"); return getUpdateCount(numUpdateQueries, affectedRowsByQuery); } finally {