diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java index f92ce7325e..de202c9bfc 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/BoneCPDataSourceProvider.java @@ -89,6 +89,7 @@ public DataSource create(Configuration hdpConfig) throws SQLException { DatabaseProduct dbProduct = determineDatabaseProduct(driverUrl); switch (dbProduct){ case MYSQL: + connProperties.put("allowMultiQueries", true); connProperties.put("rewriteBatchedStatements", true); break; case POSTGRES: diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java index 85719fdf84..12cacd2bed 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java @@ -71,6 +71,7 @@ public DataSource create(Configuration hdpConfig) throws SQLException { DatabaseProduct dbProduct = determineDatabaseProduct(driverUrl); switch (dbProduct){ case MYSQL: + dbcpDs.setConnectionProperties("allowMultiQueries=true"); dbcpDs.setConnectionProperties("rewriteBatchedStatements=true"); break; case POSTGRES: diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java index 76bbf3bc1e..3e871e9fba 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java @@ -76,6 +76,7 @@ public DataSource create(Configuration hdpConfig) throws SQLException { switch (dbProduct){ case MYSQL: config.setConnectionInitSql("SET @@session.sql_mode=ANSI_QUOTES"); + config.addDataSourceProperty("allowMultiQueries", true); config.addDataSourceProperty("rewriteBatchedStatements", true); break; case POSTGRES: diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 74ef88545e..7e1941b1fb 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -39,10 +39,12 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -52,6 +54,7 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -190,6 +193,14 @@ "\"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\") " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s, ?, ?, ?)"; + private static final String TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"TXN_COMPONENTS\" (" + + "\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")" + + " VALUES (?, ?, ?, ?, ?, ?)"; + private static final String INCREMENT_NEXT_LOCK_ID_QUERY = "UPDATE \"NEXT_LOCK_ID\" SET \"NL_NEXT\" = %d"; + private static final String UPDATE_HIVE_LOCKS_EXT_ID_QUERY = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_EXT_ID\" = %d WHERE \"HL_LOCK_EXT_ID\" = %d"; + private static final String SELECT_WRITE_ID_QUERY = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE" + + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = ?"; + private List transactionalListeners; /** @@ -2394,14 +2405,11 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc Connection dbConn = null; try { Statement stmt = null; - PreparedStatement pStmt = null; - List insertPreparedStmts = null; - ResultSet rs = null; try { lockInternal(); dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - long txnid = rqst.getTxnid(); stmt = dbConn.createStatement(); + long txnid = rqst.getTxnid(); if (isValidTxn(txnid)) { //this also ensures that txn is still there in expected state TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN); @@ -2410,175 +2418,20 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc shouldNeverHappen(txnid); } } + /* Insert txn components and hive locks (with a temp extLockId) first, before getting the next lock ID in a select-for-update. + This should minimize the scope of the S4U and decrease the table lock duration. */ + insertTxnComponents(txnid, rqst, dbConn); + long tempLockId = insertHiveLocksWithTemporaryExtLockId(txnid, dbConn, rqst); + /** Get the next lock id. * This has to be atomic with adding entries to HIVE_LOCK entries (1st add in W state) to prevent a race. * Suppose ID gen is a separate txn and 2 concurrent lock() methods are running. 1st one generates nl_next=7, * 2nd nl_next=8. Then 8 goes first to insert into HIVE_LOCKS and acquires the locks. Then 7 unblocks, * and add it's W locks but it won't see locks from 8 since to be 'fair' {@link #checkLock(java.sql.Connection, long)} * doesn't block on locks acquired later than one it's checking*/ - String s = sqlGenerator.addForUpdateClause("SELECT \"NL_NEXT\" FROM \"NEXT_LOCK_ID\""); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - LOG.debug("Going to rollback"); - dbConn.rollback(); - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_lock_id"); - } - long extLockId = rs.getLong(1); - s = "UPDATE \"NEXT_LOCK_ID\" SET \"NL_NEXT\" = " + (extLockId + 1); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + long extLockId = getNextLockIdForUpdate(dbConn, stmt); + incrementLockIdAndUpdateHiveLocks(stmt, extLockId, tempLockId); - if (txnid > 0) { - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - // For each component in this lock request, - // add an entry to the txn_components table - for (LockComponent lc : rqst.getComponent()) { - if(lc.isSetIsTransactional() && !lc.isIsTransactional()) { - //we don't prevent using non-acid resources in a txn but we do lock them - continue; - } - boolean updateTxnComponents; - if(!lc.isSetOperationType()) { - //request came from old version of the client - updateTxnComponents = true;//this matches old behavior - } - else { - switch (lc.getOperationType()) { - case INSERT: - case UPDATE: - case DELETE: - if(!lc.isSetIsDynamicPartitionWrite()) { - //must be old client talking, i.e. we don't know if it's DP so be conservative - updateTxnComponents = true; - } - else { - /** - * we know this is part of DP operation and so we'll get - * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list - * of partitions actually chaged. - */ - updateTxnComponents = !lc.isIsDynamicPartitionWrite(); - } - break; - case SELECT: - updateTxnComponents = false; - break; - case NO_TXN: - /*this constant is a bit of a misnomer since we now always have a txn context. It - just means the operation is such that we don't care what tables/partitions it - affected as it doesn't trigger a compaction or conflict detection. A better name - would be NON_TRANSACTIONAL.*/ - updateTxnComponents = false; - break; - default: - //since we have an open transaction, only 4 values above are expected - throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType() - + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid)); - } - } - if(!updateTxnComponents) { - continue; - } - String dbName = normalizeCase(lc.getDbname()); - String tblName = normalizeCase(lc.getTablename()); - String partName = normalizeCase(lc.getPartitionname()); - Long writeId = null; - if (tblName != null) { - // It is assumed the caller have already allocated write id for adding/updating data to - // the acid tables. However, DDL operatons won't allocate write id and hence this query - // may return empty result sets. - // Get the write id allocated by this txn for the given table writes - s = "SELECT \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE" - + " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = " + txnid; - pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tblName)); - LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", - quoteString(dbName), quoteString(tblName)); - rs = pStmt.executeQuery(); - if (rs.next()) { - writeId = rs.getLong(1); - } - } - rows.add(txnid + ", ?, " + - (tblName == null ? "null" : "?") + ", " + - (partName == null ? "null" : "?")+ "," + - quoteString(OperationType.fromDataOperationType(lc.getOperationType()).toString())+ "," + - (writeId == null ? "null" : writeId)); - List params = new ArrayList<>(); - params.add(dbName); - if (tblName != null) { - params.add(tblName); - } - if (partName != null) { - params.add(partName); - } - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXN_COMPONENTS\" (\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")", - rows, paramsList); - for(PreparedStatement pst : insertPreparedStmts) { - int modCount = pst.executeUpdate(); - closeStmt(pst); - } - insertPreparedStmts = null; - } - String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct); - String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB); - - int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); - long intLockId = 0; - - try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) { - for (LockComponent lc : rqst.getComponent()) { - if (lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && - (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { - //Old version of thrift client should have (lc.isSetOperationType() == false), but they do not - //If you add a default value to a variable, isSet() for that variable is true regardless of the where the - //message was created (for object variables). - //It works correctly for boolean vars, e.g. LockComponent.isTransactional). - //in test mode, upgrades are not tested, so client version and server version of thrift always matches so - //we see UNSET here it means something didn't set the appropriate value. - throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " - + lc + " agentInfo=" + rqst.getAgentInfo()); - } - intLockId++; - - char lockChar = 'z'; - switch (lc.getType()) { - case EXCLUSIVE: - lockChar = LOCK_EXCLUSIVE; - break; - case SHARED_READ: - lockChar = LOCK_SHARED; - break; - case SHARED_WRITE: - lockChar = LOCK_SEMI_SHARED; - break; - } - pstmt.setLong(1, extLockId); - pstmt.setLong(2, intLockId); - pstmt.setLong(3, txnid); - pstmt.setString(4, normalizeCase(lc.getDbname())); - pstmt.setString(5, normalizeCase(lc.getTablename())); - pstmt.setString(6, normalizeCase(lc.getPartitionname())); - pstmt.setString(7, Character.toString(LOCK_WAITING)); - pstmt.setString(8, Character.toString(lockChar)); - pstmt.setString(9, rqst.getUser()); - pstmt.setString(10, rqst.getHostname()); - pstmt.setString(11, rqst.getAgentInfo()); - - pstmt.addBatch(); - if (intLockId % batchSize == 0) { - pstmt.executeBatch(); - } - } - if (intLockId % batchSize != 0) { - pstmt.executeBatch(); - } - } dbConn.commit(); success = true; return new ConnectionLockIdPair(dbConn, extLockId); @@ -2589,13 +2442,7 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc throw new MetaException("Unable to update transaction database " + StringUtils.stringifyException(e)); } finally { - if (insertPreparedStmts != null) { - for (PreparedStatement pst : insertPreparedStmts) { - closeStmt(pst); - } - } - closeStmt(pStmt); - close(rs, stmt, null); + closeStmt(stmt); if (!success) { /* This needs to return a "live" connection to be used by operation that follows it. Thus it only closes Connection on failure/retry. */ @@ -2608,6 +2455,212 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc return enqueueLockWithRetry(rqst); } } + + private long getNextLockIdForUpdate(Connection dbConn, Statement stmt) throws SQLException, MetaException { + String s = sqlGenerator.addForUpdateClause("SELECT \"NL_NEXT\" FROM \"NEXT_LOCK_ID\""); + LOG.debug("Going to execute query <" + s + ">"); + try (ResultSet rs = stmt.executeQuery(s)) { + if (!rs.next()) { + LOG.debug("Going to rollback"); + dbConn.rollback(); + throw new MetaException("Transaction tables not properly " + + "initialized, no record found in next_lock_id"); + } + return rs.getLong(1); + } + } + + private void incrementLockIdAndUpdateHiveLocks(Statement stmt, long extLockId, long tempLockId) throws SQLException { + String incrCmd = String.format(INCREMENT_NEXT_LOCK_ID_QUERY, (extLockId + 1)); + // update hive locks entries with the real LOCK_EXT_ID (replace temp ID) + String updateLocksCmd = String.format(UPDATE_HIVE_LOCKS_EXT_ID_QUERY, extLockId, tempLockId); + LOG.debug("Going to execute updates in batch: <" + incrCmd + ">, and <" + updateLocksCmd + ">"); + stmt.addBatch(incrCmd); + stmt.addBatch(updateLocksCmd); + stmt.executeBatch(); + } + + private void insertTxnComponents(long txnid, LockRequest rqst, Connection dbConn) throws SQLException { + if (txnid > 0) { + Map, Optional> writeIdCache = new HashMap<>(); + try (PreparedStatement pstmt = dbConn.prepareStatement(TXN_COMPONENTS_INSERT_QUERY)) { + // For each component in this lock request, + // add an entry to the txn_components table + int insertCounter = 0; + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + for (LockComponent lc : rqst.getComponent()) { + if (lc.isSetIsTransactional() && !lc.isIsTransactional()) { + //we don't prevent using non-acid resources in a txn but we do lock them + continue; + } + if (!shouldUpdateTxnComponent(txnid, rqst, lc)) { + continue; + } + String dbName = normalizeCase(lc.getDbname()); + String tblName = normalizeCase(lc.getTablename()); + String partName = normalizeCase(lc.getPartitionname()); + Optional writeId = getWriteId(writeIdCache, dbName, tblName, txnid, dbConn); + + pstmt.setLong(1, txnid); + pstmt.setString(2, dbName); + pstmt.setString(3, tblName); + pstmt.setString(4, partName); + pstmt.setString(5, OperationType.fromDataOperationType(lc.getOperationType()).toString()); + pstmt.setObject(6, writeId.orElse(null)); + + pstmt.addBatch(); + insertCounter++; + if (insertCounter % batchSize == 0) { + LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + batchSize); + pstmt.executeBatch(); + } + } + if (insertCounter % batchSize != 0) { + LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % batchSize); + pstmt.executeBatch(); + } + } + } + } + + private Optional getWriteId(Map, Optional> writeIdCache, String dbName, String tblName, long txnid, Connection dbConn) throws SQLException { + /* we can cache writeIDs based on dbName and tblName because txnid is invariant and + partitionName is not part of the writeID select query */ + Pair dbAndTable = Pair.of(dbName, tblName); + if (writeIdCache.containsKey(dbAndTable)) { + return writeIdCache.get(dbAndTable); + } else { + Optional writeId = getWriteIdFromDb(txnid, dbConn, dbName, tblName); + writeIdCache.put(dbAndTable, writeId); + return writeId; + } + } + + private Optional getWriteIdFromDb(long txnid, Connection dbConn, String dbName, String tblName) throws SQLException { + if (tblName != null) { + // It is assumed the caller have already allocated write id for adding/updating data to + // the acid tables. However, DDL operatons won't allocate write id and hence this query + // may return empty result sets. + // Get the write id allocated by this txn for the given table writes + try (PreparedStatement pstmt = dbConn.prepareStatement(SELECT_WRITE_ID_QUERY)) { + pstmt.setString(1, dbName); + pstmt.setString(2, tblName); + pstmt.setLong(3, txnid); + LOG.debug("Going to execute query <" + SELECT_WRITE_ID_QUERY + ">"); + try (ResultSet rs = pstmt.executeQuery()) { + if (rs.next()) { + return Optional.of(rs.getLong(1)); + } + } + } + } + return Optional.empty(); + } + + private boolean shouldUpdateTxnComponent(long txnid, LockRequest rqst, LockComponent lc) { + if(!lc.isSetOperationType()) { + //request came from old version of the client + return true; //this matches old behavior + } + else { + switch (lc.getOperationType()) { + case INSERT: + case UPDATE: + case DELETE: + if(!lc.isSetIsDynamicPartitionWrite()) { + //must be old client talking, i.e. we don't know if it's DP so be conservative + return true; + } + else { + /** + * we know this is part of DP operation and so we'll get + * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list + * of partitions actually chaged. + */ + return !lc.isIsDynamicPartitionWrite(); + } + case SELECT: + return false; + case NO_TXN: + /*this constant is a bit of a misnomer since we now always have a txn context. It + just means the operation is such that we don't care what tables/partitions it + affected as it doesn't trigger a compaction or conflict detection. A better name + would be NON_TRANSACTIONAL.*/ + return false; + default: + //since we have an open transaction, only 4 values above are expected + throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType() + + " agentInfo=" + rqst.getAgentInfo() + " " + JavaUtils.txnIdToString(txnid)); + } + } + } + + private long insertHiveLocksWithTemporaryExtLockId(long txnid, Connection dbConn, LockRequest rqst) throws MetaException, SQLException { + long generatedExtLockId = generateTemporaryLockId(); + String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct); + String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB); + + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + long intLockId = 0; + + try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) { + for (LockComponent lc : rqst.getComponent()) { + if (lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && + (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { + //Old version of thrift client should have (lc.isSetOperationType() == false), but they do not + //If you add a default value to a variable, isSet() for that variable is true regardless of the where the + //message was created (for object variables). + //It works correctly for boolean vars, e.g. LockComponent.isTransactional). + //in test mode, upgrades are not tested, so client version and server version of thrift always matches so + //we see UNSET here it means something didn't set the appropriate value. + throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " + + lc + " agentInfo=" + rqst.getAgentInfo()); + } + intLockId++; + + pstmt.setLong(1, generatedExtLockId); + pstmt.setLong(2, intLockId); + pstmt.setLong(3, txnid); + pstmt.setString(4, normalizeCase(lc.getDbname())); + pstmt.setString(5, normalizeCase(lc.getTablename())); + pstmt.setString(6, normalizeCase(lc.getPartitionname())); + pstmt.setString(7, Character.toString(LOCK_WAITING)); + pstmt.setString(8, Character.toString(getLockChar(lc.getType()))); + pstmt.setString(9, rqst.getUser()); + pstmt.setString(10, rqst.getHostname()); + pstmt.setString(11, rqst.getAgentInfo()); + + pstmt.addBatch(); + if (intLockId % batchSize == 0) { + LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + batchSize); + pstmt.executeBatch(); + } + } + if (intLockId % batchSize != 0) { + LOG.debug("Executing HIVE_LOCKS inserts in batch. Batch size: " + intLockId % batchSize); + pstmt.executeBatch(); + } + } + return generatedExtLockId; + } + + private long generateTemporaryLockId() { + return ThreadLocalRandom.current().nextLong(1000, Long.MAX_VALUE); + } + + private char getLockChar(LockType lockType) { + switch (lockType) { + case EXCLUSIVE: + return LOCK_EXCLUSIVE; + case SHARED_READ: + return LOCK_SHARED; + case SHARED_WRITE: + return LOCK_SEMI_SHARED; + default: + return 'z'; + } + } + private static String normalizeCase(String s) { return s == null ? null : s.toLowerCase(); } @@ -3271,7 +3324,6 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) throws NoSuchTxnException, TxnAbortedException, MetaException { Connection dbConn = null; Statement stmt = null; - List insertPreparedStmts = null; try { try { lockInternal(); @@ -3290,23 +3342,28 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) } Long writeId = rqst.getWriteid(); - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - for (String partName : rqst.getPartitionnames()) { - rows.add(rqst.getTxnid() + ",?,?,?," + quoteChar(ot.sqlConst) + "," + writeId); - List params = new ArrayList<>(); - params.add(normalizeCase(rqst.getDbname())); - params.add(normalizeCase(rqst.getTablename())); - params.add(partName); - paramsList.add(params); - } - int modCount = 0; - //record partitions that were written to - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"TXN_COMPONENTS\" (\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", \"TC_OPERATION_TYPE\", \"TC_WRITEID\")", - rows, paramsList); - for(PreparedStatement pst : insertPreparedStmts) { - modCount = pst.executeUpdate(); + try (PreparedStatement pstmt = dbConn.prepareStatement(TXN_COMPONENTS_INSERT_QUERY)) { + int insertCounter = 0; + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); + for (String partName : rqst.getPartitionnames()) { + pstmt.setLong(1, rqst.getTxnid()); + pstmt.setString(2, normalizeCase(rqst.getDbname())); + pstmt.setString(3, normalizeCase(rqst.getTablename())); + pstmt.setString(4, partName); + pstmt.setString(5, Character.toString(ot.sqlConst)); + pstmt.setObject(6, writeId); + + pstmt.addBatch(); + insertCounter++; + if (insertCounter % batchSize == 0) { + LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + batchSize); + pstmt.executeBatch(); + } + } + if (insertCounter % batchSize != 0) { + LOG.debug("Executing a batch of <" + TXN_COMPONENTS_INSERT_QUERY + "> queries. Batch size: " + insertCounter % batchSize); + pstmt.executeBatch(); + } } LOG.debug("Going to commit"); dbConn.commit(); @@ -3317,11 +3374,6 @@ public void addDynamicPartitions(AddDynamicPartitions rqst) throw new MetaException("Unable to insert into from transaction database " + StringUtils.stringifyException(e)); } finally { - if (insertPreparedStmts != null) { - for(PreparedStatement pst : insertPreparedStmts) { - closeStmt(pst); - } - } close(null, stmt, dbConn); unlockInternal(); }