diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java index 333610d..748c170 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java @@ -31,6 +31,7 @@ import java.util.Properties; import static org.apache.hadoop.hive.metastore.DatabaseProduct.MYSQL; +import static org.apache.hadoop.hive.metastore.DatabaseProduct.POSTGRES; import static org.apache.hadoop.hive.metastore.DatabaseProduct.determineDatabaseProduct; /** @@ -70,6 +71,10 @@ public DataSource create(Configuration hdpConfig) throws SQLException { if (determineDatabaseProduct(driverUrl) == MYSQL) { config.setConnectionInitSql("SET @@session.sql_mode=ANSI_QUOTES"); + config.addDataSourceProperty("rewriteBatchedStatements", true); + } + if (determineDatabaseProduct(driverUrl) == POSTGRES) { + config.addDataSourceProperty("reWriteBatchedInserts", true); } //https://github.com/brettwooldridge/HikariCP config.setConnectionTimeout(connectionTimeout); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 2995afa..ff80ac9 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -2506,74 +2506,66 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc } insertPreparedStmts = null; } - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); - long intLockId = 0; - long lastHB = (isValidTxn(txnid) ? 0 : getDbTime(dbConn)); - for (LockComponent lc : rqst.getComponent()) { - if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && - (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { - //old version of thrift client should have (lc.isSetOperationType() == false) but they do not - //If you add a default value to a variable, isSet() for that variable is true regardless of the where the - //message was created (for object variables. - // It works correctly for boolean vars, e.g. LockComponent.isTransactional). - //in test mode, upgrades are not tested, so client version and server version of thrift always matches so - //we see UNSET here it means something didn't set the appropriate value. - throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " - + lc + " agentInfo=" + rqst.getAgentInfo()); - } - intLockId++; - String dbName = normalizeCase(lc.getDbname()); - String tblName = normalizeCase(lc.getTablename()); - String partName = normalizeCase(lc.getPartitionname()); - LockType lockType = lc.getType(); - char lockChar = 'z'; - switch (lockType) { - case EXCLUSIVE: - lockChar = LOCK_EXCLUSIVE; - break; - case SHARED_READ: - lockChar = LOCK_SHARED; - break; - case SHARED_WRITE: - lockChar = LOCK_SEMI_SHARED; - break; - } - rows.add(extLockId + ", " + intLockId + "," + txnid + ", ?, " + - ((tblName == null) ? "null" : "?") + ", " + - ((partName == null) ? "null" : "?") + ", " + - quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " + - //for locks associated with a txn, we always heartbeat txn and timeout based on that - lastHB + ", " + - ((rqst.getUser() == null) ? "null" : "?") + ", " + - ((rqst.getHostname() == null) ? "null" : "?") + ", " + - ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")"; - List params = new ArrayList<>(); - params.add(dbName); - if (tblName != null) { - params.add(tblName); - } - if (partName != null) { - params.add(partName); - } - if (rqst.getUser() != null) { - params.add(rqst.getUser()); - } - if (rqst.getHostname() != null) { - params.add(rqst.getHostname()); + + try (PreparedStatement pstmt = dbConn.prepareStatement( + "INSERT INTO \"HIVE_LOCKS\" (" + + " \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\"," + + " \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\")" + + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { + + long intLockId = 0; + long lastHB = (isValidTxn(txnid) ? 0 : getDbTime(dbConn)); + + for (LockComponent lc : rqst.getComponent()) { + if (lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && + (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { + //old version of thrift client should have (lc.isSetOperationType() == false) but they do not + //If you add a default value to a variable, isSet() for that variable is true regardless of the where the + //message was created (for object variables. + // It works correctly for boolean vars, e.g. LockComponent.isTransactional). + //in test mode, upgrades are not tested, so client version and server version of thrift always matches so + //we see UNSET here it means something didn't set the appropriate value. + throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " + + lc + " agentInfo=" + rqst.getAgentInfo()); + } + intLockId++; + + char lockChar = 'z'; + switch (lc.getType()) { + case EXCLUSIVE: + lockChar = LOCK_EXCLUSIVE; + break; + case SHARED_READ: + lockChar = LOCK_SHARED; + break; + case SHARED_WRITE: + lockChar = LOCK_SEMI_SHARED; + break; + } + + pstmt.setLong(1, extLockId); + pstmt.setLong(2, intLockId); + pstmt.setLong(3, txnid); + pstmt.setString(4, normalizeCase(lc.getDbname())); + pstmt.setString(5, normalizeCase(lc.getTablename())); + pstmt.setString(6, normalizeCase(lc.getPartitionname())); + pstmt.setString(7, Character.toString(LOCK_WAITING)); + pstmt.setString(8, Character.toString(lockChar)); + pstmt.setLong(9, lastHB); + pstmt.setString(10, rqst.getUser()); + pstmt.setString(11, rqst.getHostname()); + pstmt.setString(12, rqst.getAgentInfo()); + + pstmt.addBatch(); + if (intLockId % MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) { + pstmt.executeBatch(); + } } - if (rqst.getAgentInfo() != null) { - params.add(rqst.getAgentInfo()); + if (!rqst.getComponent().isEmpty()) { + pstmt.executeBatch(); } - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"HIVE_LOCKS\" (\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", " + - "\"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", " + - "\"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\")", rows, paramsList); - for(PreparedStatement pst : insertPreparedStmts) { - int modCount = pst.executeUpdate(); } + dbConn.commit(); success = true; return new ConnectionLockIdPair(dbConn, extLockId);