diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 1ea3541..34cb2ca 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -168,7 +168,7 @@ public CompactionInfo findNextToCompact(String workerId) throws MetaException { } // Now, update this record as being worked on by this worker. - long now = System.currentTimeMillis(); + long now = getDbTime(dbConn); s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " + "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id; LOG.debug("Going to execute update <" + s + ">"); @@ -485,7 +485,7 @@ public void revokeFromLocalWorkers(String hostname) throws MetaException { public void revokeTimedoutWorkers(long timeout) throws MetaException { try { Connection dbConn = getDbConn(); - long latestValidStart = System.currentTimeMillis() - timeout; + long latestValidStart = getDbTime(dbConn) - timeout; try { Statement stmt = dbConn.createStatement(); String s = "update COMPACTION_QUEUE set cq_worker_id = null, cq_start = null, cq_state = '" diff --git metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 631f3e6..412eb28 100644 --- metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -74,6 +74,7 @@ */ protected int deadlockCnt; protected HiveConf conf; + protected DatabaseProduct dbProduct; // Transaction timeout, in milliseconds. private long timeout; @@ -253,7 +254,7 @@ public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException { String s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); - long now = System.currentTimeMillis(); + long now = getDbTime(dbConn); s = "insert into TXNS (txn_id, txn_state, txn_started, " + "txn_last_heartbeat, txn_user, txn_host) values (?, 'o', " + now + ", " + now + ", '" + rqst.getUser() + "', '" + rqst.getHostname() + "')"; @@ -443,7 +444,7 @@ public LockResponse checkLock(CheckLockRequest rqst) heartbeatLock(dbConn, extLockId); long txnid = getTxnIdFromLockId(dbConn, extLockId); if (txnid > 0) heartbeatTxn(dbConn, txnid); - return checkLock(dbConn, extLockId, txnid, true); + return checkLock(dbConn, extLockId, true); } catch (SQLException e) { try { LOG.debug("Going to rollback"); @@ -836,6 +837,90 @@ protected void detectDeadlock(SQLException e, String caller) throws DeadlockExce } } + /** + * Determine the current time, using the RDBMS as a source of truth + * @param conn database connection + * @return current time in milliseconds + * @throws org.apache.hadoop.hive.metastore.api.MetaException if the time cannot be determined + */ + protected long getDbTime(Connection conn) throws MetaException { + try { + Statement stmt = conn.createStatement(); + String s; + ResultSet rs; + DatabaseProduct prod = determineDatabaseProduct(conn); + switch (prod) { + case DERBY: + s = "values current_timestamp"; + break; + + case MYSQL: + case POSTGRES: + case SQLSERVER: + s = "select current_timestamp"; + break; + + case ORACLE: + s = "select current_timestamp from dual"; + break; + + default: + String msg = "Unknown database product: " + prod.toString(); + LOG.error(msg); + throw new MetaException(msg); + } + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) throw new MetaException("No results from date query"); + return rs.getTimestamp(1).getTime(); + } catch (SQLException e) { + String msg = "Unable to determine current time: " + e.getMessage(); + LOG.error(msg); + throw new MetaException(msg); + } + } + + protected enum DatabaseProduct { DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER} + + /** + * Determine the database product type + * @param conn database connection + * @return database product type + * @throws MetaException if the type cannot be determined or is unknown + */ + protected DatabaseProduct determineDatabaseProduct(Connection conn) throws MetaException { + if (dbProduct == null) { + try { + String s = conn.getMetaData().getDatabaseProductName(); + if (s == null) { + String msg = "getDatabaseProductName returns null, can't determine database product"; + LOG.error(msg); + throw new MetaException(msg); + } else if (s.equals("Apache Derby")) { + dbProduct = DatabaseProduct.DERBY; + } else if (s.equals("Microsoft SQL Server")) { + dbProduct = DatabaseProduct.SQLSERVER; + } else if (s.equals("MySQL")) { + dbProduct = DatabaseProduct.MYSQL; + } else if (s.equals("Oracle")) { + dbProduct = DatabaseProduct.ORACLE; + } else if (s.equals("PostgreSQL")) { + dbProduct = DatabaseProduct.POSTGRES; + } else { + String msg = "Unrecognized database product name <" + s + ">"; + LOG.error(msg); + throw new MetaException(msg); + } + + } catch (SQLException e) { + String msg = "Unable to get database product name: " + e.getMessage(); + LOG.error(msg); + throw new MetaException(msg); + } + } + return dbProduct; + } + private static class LockInfo { long extLockId; long intLockId; @@ -947,7 +1032,7 @@ private void checkQFileTestHack() { * Abort a group of txns * @param dbConn An active connection * @param txnids list of transactions to abort - * @return + * @return Number of aborted transactions * @throws SQLException */ private int abortTxns(Connection dbConn, List txnids) throws SQLException { @@ -1070,7 +1155,7 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) case SHARED_READ: lockChar = LOCK_SHARED; break; case SHARED_WRITE: lockChar = LOCK_SEMI_SHARED; break; } - long now = System.currentTimeMillis(); + long now = getDbTime(dbConn); s = "insert into HIVE_LOCKS " + " (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, hl_table, " + "hl_partition, hl_lock_state, hl_lock_type, hl_last_heartbeat, hl_user, hl_host)" + @@ -1083,7 +1168,7 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) LOG.debug("Going to execute update <" + s + ">"); stmt.executeUpdate(s); } - LockResponse rsp = checkLock(dbConn, extLockId, txnid, wait); + LockResponse rsp = checkLock(dbConn, extLockId, wait); if (!wait && rsp.getState() != LockState.ACQUIRED) { LOG.debug("Lock not acquired, going to rollback"); dbConn.rollback(); @@ -1100,14 +1185,12 @@ private LockResponse lock(Connection dbConn, LockRequest rqst, boolean wait) private LockResponse checkLock(Connection dbConn, long extLockId, - long txnid, boolean alwaysCommit) throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException { List locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId); LockResponse response = new LockResponse(); response.setLockid(extLockId); - long now = System.currentTimeMillis(); LOG.debug("Setting savepoint"); Savepoint save = dbConn.setSavepoint(); Statement stmt = dbConn.createStatement(); @@ -1288,8 +1371,8 @@ private void wait(Connection dbConn, Savepoint save) throws SQLException { } private void acquire(Connection dbConn, Statement stmt, long extLockId, long intLockId) - throws SQLException, NoSuchLockException { - long now = System.currentTimeMillis(); + throws SQLException, NoSuchLockException, MetaException { + long now = getDbTime(dbConn); String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " + "hl_last_heartbeat = " + now + ", hl_acquired_at = " + now + " where hl_lock_ext_id = " + extLockId + " and hl_lock_int_id = " + intLockId; @@ -1308,11 +1391,11 @@ private void acquire(Connection dbConn, Statement stmt, long extLockId, long int // Heartbeats on the lock table. This commits, so do not enter it with any state private void heartbeatLock(Connection dbConn, long extLockId) - throws NoSuchLockException, SQLException { + throws NoSuchLockException, SQLException, MetaException { // If the lock id is 0, then there are no locks in this heartbeat if (extLockId == 0) return; Statement stmt = dbConn.createStatement(); - long now = System.currentTimeMillis(); + long now = getDbTime(dbConn); String s = "update HIVE_LOCKS set hl_last_heartbeat = " + now + " where hl_lock_ext_id = " + extLockId; @@ -1329,11 +1412,11 @@ private void heartbeatLock(Connection dbConn, long extLockId) // Heartbeats on the txn table. This commits, so do not enter it with any state private void heartbeatTxn(Connection dbConn, long txnid) - throws NoSuchTxnException, TxnAbortedException, SQLException { + throws NoSuchTxnException, TxnAbortedException, SQLException, MetaException { // If the txnid is 0, then there are no transactions in this heartbeat if (txnid == 0) return; Statement stmt = dbConn.createStatement(); - long now = System.currentTimeMillis(); + long now = getDbTime(dbConn); // We need to check whether this transaction is valid and open String s = "select txn_state from TXNS where txn_id = " + txnid + " for update"; @@ -1400,8 +1483,8 @@ private long getTxnIdFromLockId(Connection dbConn, long extLockId) // Clean time out locks from the database. This does a commit, // and thus should be done before any calls to heartbeat that will leave // open transactions. - private void timeOutLocks(Connection dbConn) throws SQLException { - long now = System.currentTimeMillis(); + private void timeOutLocks(Connection dbConn) throws SQLException, MetaException { + long now = getDbTime(dbConn); Statement stmt = dbConn.createStatement(); // Remove any timed out locks from the table. String s = "delete from HIVE_LOCKS where hl_last_heartbeat < " + @@ -1410,14 +1493,13 @@ private void timeOutLocks(Connection dbConn) throws SQLException { stmt.executeUpdate(s); LOG.debug("Going to commit"); dbConn.commit(); - return; } // Abort timed out transactions. This calls abortTxn(), which does a commit, // and thus should be done before any calls to heartbeat that will leave // open transactions on the underlying database. - private void timeOutTxns(Connection dbConn) throws SQLException { - long now = System.currentTimeMillis(); + private void timeOutTxns(Connection dbConn) throws SQLException, MetaException { + long now = getDbTime(dbConn); Statement stmt = dbConn.createStatement(); // Abort any timed out locks from the table. String s = "select txn_id from TXNS where txn_state = '" + TXN_OPEN +