diff --git standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 2995afad23..3bd06d31e2 100644 --- standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -27,6 +27,7 @@ import java.sql.SQLFeatureNotSupportedException; import java.sql.Savepoint; import java.sql.Statement; +import java.sql.Timestamp; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; @@ -606,7 +607,6 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); - long now = getDbTime(dbConn); List txnIds = new ArrayList<>(numTxns); List rows = new ArrayList<>(); @@ -614,9 +614,10 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { params.add(rqst.getUser()); params.add(rqst.getHostname()); List> paramsList = new ArrayList<>(numTxns); + String dbEpochString = getDbEpochString(); for (long i = first; i < first + numTxns; i++) { txnIds.add(i); - rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ",?,?," + txnType.getValue()); + rows.add(i + "," + quoteChar(TXN_OPEN) + "," + dbEpochString + "," + dbEpochString + ",?,?," + txnType.getValue()); paramsList.add(params); } insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn, @@ -2509,7 +2510,7 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc List rows = new ArrayList<>(); List> paramsList = new ArrayList<>(); long intLockId = 0; - long lastHB = (isValidTxn(txnid) ? 0 : getDbTime(dbConn)); + String lastHBString = (isValidTxn(txnid) ? "0" : getDbEpochString()); for (LockComponent lc : rqst.getComponent()) { if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET && (MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) || MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST))) { @@ -2544,7 +2545,7 @@ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuc ((partName == null) ? "null" : "?") + ", " + quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " + //for locks associated with a txn, we always heartbeat txn and timeout based on that - lastHB + ", " + + lastHBString + ", " + ((rqst.getUser() == null) ? "null" : "?") + ", " + ((rqst.getHostname() == null) ? "null" : "?") + ", " + ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")"; @@ -2947,7 +2948,7 @@ public HeartbeatTxnRangeResponse heartbeatTxnRange(HeartbeatTxnRangeRequest rqst txnIds.add(txn); } TxnUtils.buildQueryWithINClause(conf, queries, - new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + getDbTime(dbConn) + + new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + getDbEpochString() + " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "), new StringBuilder(""), txnIds, "\"TXN_ID\"", true, false); int updateCnt = 0; @@ -3911,6 +3912,34 @@ protected void checkRetryable(Connection conn, } } + /** + * Returns the query string representation which will returns the milliseconds value after epoch + * @return The string which will insert the current timestamp milliseconds value + * @throws MetaException + */ + private static String epochInCurrentTimezone = null; + protected String getDbEpochString() throws MetaException { + switch (dbProduct) { + case DERBY: + if (epochInCurrentTimezone == null) { + epochInCurrentTimezone = new Timestamp(0).toString(); + } + return "{ fn timestampdiff(sql_tsi_frac_second, timestamp('" + epochInCurrentTimezone + "'), current_timestamp) } / 1000000"; + case MYSQL: + return "round(unix_timestamp(curtime(4)) * 1000)"; + case POSTGRES: + return "round(extract(epoch from current_timestamp) * 1000)"; + case ORACLE: + return "(cast(systimestamp at time zone 'UTC' as date) - date '1970-01-01')*24*60*60*1000 + cast(mod( extract( second from systimestamp ), 1 ) * 1000 as int)"; + case SQLSERVER: + return "datediff_big(millisecond, '19700101', sysutcdatetime())"; + default: + String msg = "Unknown database product: " + dbProduct.toString(); + LOG.error(msg); + throw new MetaException(msg); + } + } + /** * Determine the current time, using the RDBMS as a source of truth * @param conn database connection @@ -4192,8 +4221,8 @@ private void checkQFileTestHack() { } } - private int abortTxns(Connection dbConn, List txnids, boolean isStrict) throws SQLException { - return abortTxns(dbConn, txnids, -1, isStrict); + private int abortTxns(Connection dbConn, List txnids, boolean isStrict) throws SQLException, MetaException { + return abortTxns(dbConn, txnids, false, isStrict); } /** * TODO: expose this as an operation to client. Useful for streaming API to abort all remaining @@ -4203,8 +4232,8 @@ private int abortTxns(Connection dbConn, List txnids, boolean isStrict) th * * @param dbConn An active connection * @param txnids list of transactions to abort - * @param max_heartbeat value used by {@link #performTimeOuts()} to ensure this doesn't Abort txn which were - * hearbetated after #performTimeOuts() select and this operation. + * @param checkHeartbeat value used by {@link #performTimeOuts()} to ensure this doesn't Abort txn which were + * heartbeated after #performTimeOuts() select and this operation. * @param isStrict true for strict mode, false for best-effort mode. * In strict mode, if all txns are not successfully aborted, then the count of * updated ones will be returned and the caller will roll back. @@ -4212,8 +4241,8 @@ private int abortTxns(Connection dbConn, List txnids, boolean isStrict) th * @return Number of aborted transactions * @throws SQLException */ - private int abortTxns(Connection dbConn, List txnids, long max_heartbeat, boolean isStrict) - throws SQLException { + private int abortTxns(Connection dbConn, List txnids, boolean checkHeartbeat, boolean isStrict) + throws SQLException, MetaException { Statement stmt = null; int updateCnt = 0; if (txnids.isEmpty()) { @@ -4230,10 +4259,8 @@ private int abortTxns(Connection dbConn, List txnids, long max_heartbeat, prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) + " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "); - if(max_heartbeat > 0) { - suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ").append(max_heartbeat); - } else { - suffix.append(""); + if(checkHeartbeat) { + suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ").append(getDbEpochString()).append("-").append(timeout); } TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", true, false); @@ -4252,7 +4279,6 @@ private int abortTxns(Connection dbConn, List txnids, long max_heartbeat, suffix.setLength(0); prefix.append("DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE "); - suffix.append(""); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"MHL_TXNID\"", false, false); @@ -4278,7 +4304,6 @@ private int abortTxns(Connection dbConn, List txnids, long max_heartbeat, suffix.setLength(0); prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE "); - suffix.append(""); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false); @@ -4563,11 +4588,10 @@ private void acquire(Connection dbConn, Statement stmt, List locksBein } long txnId = locksBeingChecked.get(0).txnId; long extLockId = locksBeingChecked.get(0).extLockId; - long now = getDbTime(dbConn); String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LOCK_STATE\" = '" + LOCK_ACQUIRED + "', " + //if lock is part of txn, heartbeat info is in txn record - "\"HL_LAST_HEARTBEAT\" = " + (isValidTxn(txnId) ? 0 : now) + - ", \"HL_ACQUIRED_AT\" = " + now + ",\"HL_BLOCKEDBY_EXT_ID\"=NULL,\"HL_BLOCKEDBY_INT_ID\"=NULL" + + "\"HL_LAST_HEARTBEAT\" = " + (isValidTxn(txnId) ? 0 : getDbEpochString()) + + ", \"HL_ACQUIRED_AT\" = " + getDbEpochString() + ",\"HL_BLOCKEDBY_EXT_ID\"=NULL,\"HL_BLOCKEDBY_INT_ID\"=NULL" + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); @@ -4641,10 +4665,9 @@ private void heartbeatLock(Connection dbConn, long extLockId) Statement stmt = null; try { stmt = dbConn.createStatement(); - long now = getDbTime(dbConn); String s = "UPDATE \"HIVE_LOCKS\" SET \"HL_LAST_HEARTBEAT\" = " + - now + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; + getDbEpochString() + " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); if (rc < 1) { @@ -4667,8 +4690,7 @@ private void heartbeatTxn(Connection dbConn, long txnid) Statement stmt = null; try { stmt = dbConn.createStatement(); - long now = getDbTime(dbConn); - String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + now + + String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + getDbEpochString() + " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = '" + TXN_OPEN + "'"; LOG.debug("Going to execute update <" + s + ">"); int rc = stmt.executeUpdate(s); @@ -4917,15 +4939,14 @@ private LockInfo getTxnIdFromLockId(Connection dbConn, long extLockId) // for read-only autoCommit=true statements. This does a commit, // and thus should be done before any calls to heartbeat that will leave // open transactions. - private void timeOutLocks(Connection dbConn, long now) { + private void timeOutLocks(Connection dbConn) { Statement stmt = null; ResultSet rs = null; try { stmt = dbConn.createStatement(); - long maxHeartbeatTime = now - timeout; //doing a SELECT first is less efficient but makes it easier to debug things String s = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < " + - maxHeartbeatTime + " AND \"HL_TXNID\" = 0";//when txnid is <> 0, the lock is + getDbEpochString() + "-" + timeout + " AND \"HL_TXNID\" = 0";//when txnid is <> 0, the lock is //associated with a txn and is handled by performTimeOuts() //want to avoid expiring locks for a txn w/o expiring the txn itself List extLockIDs = new ArrayList<>(); @@ -4946,9 +4967,8 @@ private void timeOutLocks(Connection dbConn, long now) { //include same hl_last_heartbeat condition in case someone heartbeated since the select prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_LAST_HEARTBEAT\" < "); - prefix.append(maxHeartbeatTime); + prefix.append(getDbEpochString()).append("-").append(timeout); prefix.append(" AND \"HL_TXNID\" = 0 AND "); - suffix.append(""); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, extLockIDs, "\"HL_LOCK_EXT_ID\"", true, false); @@ -4960,7 +4980,7 @@ private void timeOutLocks(Connection dbConn, long now) { if(deletedLocks > 0) { Collections.sort(extLockIDs);//easier to read logs LOG.info("Deleted " + deletedLocks + " int locks from HIVE_LOCKS due to timeout (" + - "HL_LOCK_EXT_ID list: " + extLockIDs + ") maxHeartbeatTime=" + maxHeartbeatTime); + "HL_LOCK_EXT_ID list: " + extLockIDs + ")"); } LOG.debug("Going to commit"); dbConn.commit(); @@ -5000,12 +5020,11 @@ public void performTimeOuts() { //is made, in which case heartbeat will succeed but txn will still be Aborted. //Solving this corner case is not worth the perf penalty. The client should heartbeat in a //timely way. - long now = getDbTime(dbConn); - timeOutLocks(dbConn, now); + timeOutLocks(dbConn); while(true) { stmt = dbConn.createStatement(); String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN + - "' AND \"TXN_LAST_HEARTBEAT\" < " + (now - timeout) + " AND \"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue(); + "' AND \"TXN_LAST_HEARTBEAT\" < " + getDbEpochString() + "-" + timeout + " AND \"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue(); //safety valve for extreme cases s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s); LOG.debug("Going to execute query <" + s + ">"); @@ -5027,7 +5046,7 @@ public void performTimeOuts() { close(rs, stmt, null); int numTxnsAborted = 0; for(List batchToAbort : timedOutTxns) { - if(abortTxns(dbConn, batchToAbort, now - timeout, true) == batchToAbort.size()) { + if(abortTxns(dbConn, batchToAbort, true, true) == batchToAbort.size()) { dbConn.commit(); numTxnsAborted += batchToAbort.size(); //todo: add TXNS.COMMENT filed and set it to 'aborted by system due to timeout' @@ -5046,10 +5065,9 @@ public void performTimeOuts() { LOG.info("Aborted " + numTxnsAborted + " transactions due to timeout"); } } catch (SQLException ex) { - LOG.warn("Aborting timedout transactions failed due to " + getMessage(ex), ex); - } - catch(MetaException e) { - LOG.warn("Aborting timedout transactions failed due to " + e.getMessage(), e); + LOG.warn("Aborting timed out transactions failed due to " + getMessage(ex), ex); + } catch(MetaException e) { + LOG.warn("Aborting timed out transactions failed due to " + e.getMessage(), e); } finally { close(rs, stmt, dbConn);