diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java index 333610d..748c170 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java @@ -31,6 +31,7 @@ import java.util.Properties; import static org.apache.hadoop.hive.metastore.DatabaseProduct.MYSQL; +import static org.apache.hadoop.hive.metastore.DatabaseProduct.POSTGRES; import static org.apache.hadoop.hive.metastore.DatabaseProduct.determineDatabaseProduct; /** @@ -70,6 +71,10 @@ public DataSource create(Configuration hdpConfig) throws SQLException { if (determineDatabaseProduct(driverUrl) == MYSQL) { config.setConnectionInitSql("SET @@session.sql_mode=ANSI_QUOTES"); + config.addDataSourceProperty("rewriteBatchedStatements", true); + } + if (determineDatabaseProduct(driverUrl) == POSTGRES) { + config.addDataSourceProperty("reWriteBatchedInserts", true); } //https://github.com/brettwooldridge/HikariCP config.setConnectionTimeout(connectionTimeout); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index efcf2e1..c59fce1 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -25,17 +25,22 @@ import java.sql.SQLException; import java.sql.SQLTransactionRollbackException; import java.sql.Statement; +import java.sql.Timestamp; +import java.util.EnumMap; import java.util.Properties; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.DatabaseProduct; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; -import org.apache.zookeeper.txn.TxnHeader; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hive.metastore.DatabaseProduct.*; + /** * Utility methods for creating and destroying txn database/schema, plus methods for * querying against metastore tables. @@ -43,9 +48,20 @@ */ public final class TxnDbUtil { - static final private Logger LOG = LoggerFactory.getLogger(TxnDbUtil.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(TxnDbUtil.class.getName()); private static final String TXN_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; + private static final EnumMap DB_EPOCH_FN = + new EnumMap(DatabaseProduct.class) {{ + put(DERBY, "{ fn timestampdiff(sql_tsi_frac_second, timestamp('" + new Timestamp(0) + + "'), current_timestamp) } / 1000000"); + put(MYSQL, "round(unix_timestamp(curtime(4)) * 1000)"); + put(POSTGRES, "round(extract(epoch from current_timestamp) * 1000)"); + put(ORACLE, "(cast(systimestamp at time zone 'UTC' as date) - date '1970-01-01')*24*60*60*1000 " + + "+ cast(mod( extract( second from systimestamp ), 1 ) * 1000 as int)"); + put(SQLSERVER, "datediff_big(millisecond, '19700101', sysutcdatetime())"); + }}; + private static int deadlockCnt = 0; private TxnDbUtil() { @@ -500,35 +516,6 @@ public static int countLockComponents(Configuration conf, long lockId) throws Ex } /** - * Return true if the transaction of the given txnId is open. - * @param conf HiveConf - * @param txnId transaction id to search for - * @return - * @throws Exception - */ - public static boolean isOpenOrAbortedTransaction(Configuration conf, long txnId) throws Exception { - Connection conn = null; - PreparedStatement stmt = null; - ResultSet rs = null; - try { - conn = getConnection(conf); - conn.setAutoCommit(false); - conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); - - stmt = conn.prepareStatement("SELECT txn_id FROM TXNS WHERE txn_id = ?"); - stmt.setLong(1, txnId); - rs = stmt.executeQuery(); - if (!rs.next()) { - return false; - } else { - return true; - } - } finally { - closeResources(conn, stmt, rs); - } - } - - /** * Utility method used to run COUNT queries like "select count(*) from ..." against metastore tables * @param countQuery countQuery text * @return count countQuery result @@ -627,4 +614,19 @@ static void closeResources(Connection conn, Statement stmt, ResultSet rs) { } } } + + /** + * Get database specific function which returns the milliseconds value after the epoch. + * @throws MetaException For unknown database type. + */ + static String getEpochFn(DatabaseProduct dbProduct) throws MetaException { + String epochFn = DB_EPOCH_FN.get(dbProduct); + if (epochFn != null) { + return epochFn; + } else { + String msg = "Unknown database product: " + dbProduct.toString(); + LOG.error(msg); + throw new MetaException(msg); + } + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 06defdb..74ef885 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -27,7 +27,6 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.Savepoint; import java.sql.Statement; -import java.sql.Timestamp; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; @@ -178,12 +177,18 @@ static final protected char LOCK_SHARED = 'r'; static final protected char LOCK_SEMI_SHARED = 'w'; - static final private int ALLOWED_REPEATED_DEADLOCKS = 10; - static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); + private static final int ALLOWED_REPEATED_DEADLOCKS = 10; + private static final Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); - static private DataSource connPool; + private static DataSource connPool; private static DataSource connPoolMutex; - static private boolean doRetryOnConnPool = false; + private static boolean doRetryOnConnPool = false; + + // Query definitions + private static final String HIVE_LOCKS_INSERT_QRY = "INSERT INTO \"HIVE_LOCKS\" ( " + + "\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", " + + "\"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\") " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s, ?, ?, ?)"; private List transactionalListeners; @@ -611,11 +616,11 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { params.add(rqst.getUser()); params.add(rqst.getHostname()); List> paramsList = new ArrayList<>(numTxns); - String dbEpochString = getDbEpochString(); + for (long i = first; i < first + numTxns; i++) { txnIds.add(i); - rows.add(i + "," + quoteChar(TXN_OPEN) + "," + dbEpochString + "," + dbEpochString + ",?,?," - + txnType.getValue()); + rows.add(i + "," + quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + "," + + TxnDbUtil.getEpochFn(dbProduct) + ",?,?," + txnType.getValue()); paramsList.add(params); } insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, @@ -2520,73 +2525,59 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc } insertPreparedStmts = null; } - List rows = new ArrayList<>(); - List> paramsList = new ArrayList<>(); + String lastHB = isValidTxn(txnid) ? "0" : TxnDbUtil.getEpochFn(dbProduct); + String insertLocksQuery = String.format(HIVE_LOCKS_INSERT_QRY, lastHB); + + int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE); long intLockId = 0; - String lastHBString = (isValidTxn(txnid) ? "0" : getDbEpochString()); - for (LockComponent lc : rqst.getComponent()) { - if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && - (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { - //old version of thrift client should have (lc.isSetOperationType() == false) but they do not - //If you add a default value to a variable, isSet() for that variable is true regardless of the where the - //message was created (for object variables. - // It works correctly for boolean vars, e.g. LockComponent.isTransactional). - //in test mode, upgrades are not tested, so client version and server version of thrift always matches so - //we see UNSET here it means something didn't set the appropriate value. - throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " - + lc + " agentInfo=" + rqst.getAgentInfo()); - } - intLockId++; - String dbName = normalizeCase(lc.getDbname()); - String tblName = normalizeCase(lc.getTablename()); - String partName = normalizeCase(lc.getPartitionname()); - LockType lockType = lc.getType(); - char lockChar = 'z'; - switch (lockType) { - case EXCLUSIVE: - lockChar = LOCK_EXCLUSIVE; - break; - case SHARED_READ: - lockChar = LOCK_SHARED; - break; - case SHARED_WRITE: - lockChar = LOCK_SEMI_SHARED; - break; - } - rows.add(extLockId + ", " + intLockId + "," + txnid + ", ?, " + - ((tblName == null) ? "null" : "?") + ", " + - ((partName == null) ? "null" : "?") + ", " + - quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " + - //for locks associated with a txn, we always heartbeat txn and timeout based on that - lastHBString + ", " + - ((rqst.getUser() == null) ? "null" : "?") + ", " + - ((rqst.getHostname() == null) ? "null" : "?") + ", " + - ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")"; - List params = new ArrayList<>(); - params.add(dbName); - if (tblName != null) { - params.add(tblName); - } - if (partName != null) { - params.add(partName); - } - if (rqst.getUser() != null) { - params.add(rqst.getUser()); - } - if (rqst.getHostname() != null) { - params.add(rqst.getHostname()); + + try (PreparedStatement pstmt = dbConn.prepareStatement(insertLocksQuery)) { + for (LockComponent lc : rqst.getComponent()) { + if (lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && + (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { + //Old version of thrift client should have (lc.isSetOperationType() == false), but they do not + //If you add a default value to a variable, isSet() for that variable is true regardless of the where the + //message was created (for object variables). + //It works correctly for boolean vars, e.g. LockComponent.isTransactional). + //in test mode, upgrades are not tested, so client version and server version of thrift always matches so + //we see UNSET here it means something didn't set the appropriate value. + throw new IllegalStateException("Bug: operationType=" + lc.getOperationType() + " for component " + + lc + " agentInfo=" + rqst.getAgentInfo()); + } + intLockId++; + + char lockChar = 'z'; + switch (lc.getType()) { + case EXCLUSIVE: + lockChar = LOCK_EXCLUSIVE; + break; + case SHARED_READ: + lockChar = LOCK_SHARED; + break; + case SHARED_WRITE: + lockChar = LOCK_SEMI_SHARED; + break; + } + pstmt.setLong(1, extLockId); + pstmt.setLong(2, intLockId); + pstmt.setLong(3, txnid); + pstmt.setString(4, normalizeCase(lc.getDbname())); + pstmt.setString(5, normalizeCase(lc.getTablename())); + pstmt.setString(6, normalizeCase(lc.getPartitionname())); + pstmt.setString(7, Character.toString(LOCK_WAITING)); + pstmt.setString(8, Character.toString(lockChar)); + pstmt.setString(9, rqst.getUser()); + pstmt.setString(10, rqst.getHostname()); + pstmt.setString(11, rqst.getAgentInfo()); + + pstmt.addBatch(); + if (intLockId % batchSize == 0) { + pstmt.executeBatch(); + } } - if (rqst.getAgentInfo() != null) { - params.add(rqst.getAgentInfo()); + if (intLockId % batchSize != 0) { + pstmt.executeBatch(); } - paramsList.add(params); - } - insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, - "\"HIVE_LOCKS\" (\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", " + - "\"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", " + - "\"HL_LAST_HEARTBEAT\", \"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\")", rows, paramsList); - for(PreparedStatement pst : insertPreparedStmts) { - int modCount = pst.executeUpdate(); } dbConn.commit(); success = true; @@ -2961,7 +2952,7 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst txnIds.add(txn); } TxnUtils.buildQueryWithINClause(conf, queries, - new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + getDbEpochString() + + new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + TxnDbUtil.getEpochFn(dbProduct) + " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "), new StringBuilder(""), txnIds, "\"TXN_ID\"", true, false); int updateCnt = 0; @@ -3926,37 +3917,6 @@ protected void checkRetryable(Connection conn, } /** - * Returns the database specific query string representation which will return the milliseconds - * value after epoch. - * @return The string which will insert the current timestamp milliseconds value - * @throws MetaException For unknown database type - */ - private static String epochInCurrentTimezone = null; - protected String getDbEpochString() throws MetaException { - switch (dbProduct) { - case DERBY: - if (epochInCurrentTimezone == null) { - epochInCurrentTimezone = new Timestamp(0).toString(); - } - return "{ fn timestampdiff(sql_tsi_frac_second, timestamp('" + epochInCurrentTimezone + - "'), current_timestamp) } / 1000000"; - case MYSQL: - return "round(unix_timestamp(curtime(4)) * 1000)"; - case POSTGRES: - return "round(extract(epoch from current_timestamp) * 1000)"; - case ORACLE: - return "(cast(systimestamp at time zone 'UTC' as date) - date '1970-01-01')*24*60*60*1000 " + - "+ cast(mod( extract( second from systimestamp ), 1 ) * 1000 as int)"; - case SQLSERVER: - return "datediff_big(millisecond, '19700101', sysutcdatetime())"; - default: - String msg = "Unknown database product: " + dbProduct.toString(); - LOG.error(msg); - throw new MetaException(msg); - } - } - - /** * Determine the current time, using the RDBMS as a source of truth * @param conn database connection * @return current time in milliseconds @@ -4276,7 +4236,8 @@ private int abortTxns(Connection dbConn, List txnids, boolean checkHeartbe prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) + " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "); if(checkHeartbeat) { - suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ").append(getDbEpochString()).append("-").append(timeout); + suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < "); + suffix.append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout); } TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", true, false); @@ -4524,8 +4485,9 @@ private void acquire(Connection dbConn, Statement stmt, List locksBein long extLockId = locksBeingChecked.get(0).extLockId; String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_STATE\" = '" + LOCK_ACQUIRED + "', " + //if lock is part of txn, heartbeat info is in txn record - "\"HL_LAST_HEARTBEAT\" = " + (isValidTxn(txnId) ? 0 : getDbEpochString()) + - ", \"HL_ACQUIRED_AT\" = " + getDbEpochString() + ",\"HL_BLOCKEDBY_EXT_ID\"=NULL,\"HL_BLOCKEDBY_INT_ID\"=NULL" + + "\"HL_LAST_HEARTBEAT\" = " + (isValidTxn(txnId) ? 0 : TxnDbUtil.getEpochFn(dbProduct)) + + ",\"HL_ACQUIRED_AT\" = " + TxnDbUtil.getEpochFn(dbProduct) + + ",\"HL_BLOCKEDBY_EXT_ID\"=NULL,\"HL_BLOCKEDBY_INT_ID\"=NULL" + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); @@ -4570,7 +4532,7 @@ private void heartbeatLock(Connection dbConn, long extLockId) stmt = dbConn.createStatement(); String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LAST_HEARTBEAT\" = " + - getDbEpochString() + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; + TxnDbUtil.getEpochFn(dbProduct) + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -4593,7 +4555,7 @@ private void heartbeatTxn(Connection dbConn, long txnid) Statement stmt = null; try { stmt = dbConn.createStatement(); - String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + getDbEpochString() + + String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + TxnDbUtil.getEpochFn(dbProduct) + " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = '" + TXN_OPEN + "'"; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); @@ -4849,8 +4811,8 @@ private void timeOutLocks(Connection dbConn) { stmt = dbConn.createStatement(); //doing a SELECT first is less efficient but makes it easier to debug things String s = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < " + - getDbEpochString() + "-" + timeout + " AND \"HL_TXNID\" = 0"; //when txnid is <> 0, the lock is - //associated with a txn and is handled by performTimeOuts() + TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout + " AND \"HL_TXNID\" = 0"; + //when txnid is <> 0, the lock is associated with a txn and is handled by performTimeOuts() //want to avoid expiring locks for a txn w/o expiring the txn itself List extLockIDs = new ArrayList<>(); rs = stmt.executeQuery(s); @@ -4870,7 +4832,7 @@ private void timeOutLocks(Connection dbConn) { //include same hl_last_heartbeat condition in case someone heartbeated since the select prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < "); - prefix.append(getDbEpochString()).append("-").append(timeout); + prefix.append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout); prefix.append(" AND \"HL_TXNID\" = 0 AND "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "\"HL_LOCK_EXT_ID\"", true, false); @@ -4927,7 +4889,7 @@ public void performTimeOuts() { while(true) { stmt = dbConn.createStatement(); String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN + - "' AND \"TXN_LAST_HEARTBEAT\" < " + getDbEpochString() + "-" + timeout + + "' AND \"TXN_LAST_HEARTBEAT\" < " + TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout + " AND \"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue(); //safety valve for extreme cases s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s); @@ -5099,13 +5061,6 @@ private boolean isDuplicateKeyError(SQLException ex) { private static String getMessage(SQLException ex) { return ex.getMessage() + " (SQLState=" + ex.getSQLState() + ", ErrorCode=" + ex.getErrorCode() + ")"; } - /** - * Useful for building SQL strings - * @param value may be {@code null} - */ - private static String valueOrNullLiteral(String value) { - return value == null ? "null" : quoteString(value); - } static String quoteString(String input) { return "'" + input + "'"; }