diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 943aa383bb..684cab0a95 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -68,6 +68,7 @@ Licensed to the Apache Software Foundation (ASF) under one import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; /** * An implementation of HiveTxnManager that stores the transactions in the metastore database. @@ -160,6 +161,7 @@ Licensed to the Apache Software Foundation (ASF) under one private ScheduledFuture heartbeatTask = null; private Runnable shutdownRunner = null; private static final int SHUTDOWN_HOOK_PRIORITY = 0; + private final ReentrantLock heartbeatTaskLock = new ReentrantLock(); /** * We do this on every call to make sure TM uses same MS connection as is used by the caller (Driver, @@ -270,9 +272,7 @@ public void acquireLocks(QueryPlan plan, Context ctx, String username) throws Lo } catch(LockException e) { if(e.getCause() instanceof TxnAbortedException) { - txnId = 0; - stmtId = -1; - tableWriteIds.clear(); + resetTxnInfo(); } throw e; } @@ -529,11 +529,18 @@ public void rollbackTxn() throws LockException { if (!isTxnOpen()) { throw new RuntimeException("Attempt to rollback before opening a transaction"); } + stopHeartbeat(); + try { lockMgr.clearLocalLockRecords(); - stopHeartbeat(); LOG.debug("Rolling back " + JavaUtils.txnIdToString(txnId)); - getMS().rollbackTxn(txnId); + + // Re-checking as txn could have been closed, in the meantime, by a competing thread. + if (isTxnOpen()) { + getMS().rollbackTxn(txnId); + } else { + LOG.warn("Transaction is already closed."); + } } catch (NoSuchTxnException e) { LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId)); throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId)); @@ -543,10 +550,7 @@ public void rollbackTxn() throws LockException { throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e); } finally { - txnId = 0; - stmtId = -1; - numStatements = 0; - tableWriteIds.clear(); + resetTxnInfo(); } } @@ -629,11 +633,21 @@ private Heartbeater startHeartbeat(long initialDelay) throws LockException { throw new LockException("error while getting current user,", e); } - Heartbeater heartbeater = new Heartbeater(this, conf, queryId, currentUser); - heartbeatTask = startHeartbeat(initialDelay, heartbeatInterval, heartbeater); - LOG.debug("Started heartbeat with delay/interval = " + initialDelay + "/" + heartbeatInterval + - " " + TimeUnit.MILLISECONDS + " for query: " + queryId); - return heartbeater; + try { + heartbeatTaskLock.lock(); + if (heartbeatTask != null) { + throw new IllegalStateException("Heartbeater is already started."); + } + + Heartbeater heartbeater = new Heartbeater(this, conf, queryId, currentUser); + heartbeatTask = startHeartbeat(initialDelay, heartbeatInterval, heartbeater); + LOG.debug("Started heartbeat with delay/interval = " + initialDelay + "/" + heartbeatInterval + + " " + TimeUnit.MILLISECONDS + " for query: " + queryId); + + return heartbeater; + } finally { + heartbeatTaskLock.unlock(); + } } private ScheduledFuture startHeartbeat(long initialDelay, long heartbeatInterval, Runnable heartbeater) { @@ -651,30 +665,49 @@ private Heartbeater startHeartbeat(long initialDelay) throws LockException { return task; } - private void stopHeartbeat() throws LockException { - if (heartbeatTask != null) { - heartbeatTask.cancel(true); - long startTime = System.currentTimeMillis(); - long sleepInterval = 100; - while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) { - // We will wait for 30 seconds for the task to be cancelled. - // If it's still not cancelled (unlikely), we will just move on. - long now = System.currentTimeMillis(); - if (now - startTime > 30000) { - LOG.warn("Heartbeat task cannot be cancelled for unknown reason. QueryId: " + queryId); - break; + private void stopHeartbeat() { + if (heartbeatTask == null) { + // avoid unnecessary locking if the field is null + return; + } + + boolean isLockAcquired = false; + try { + // The lock should not be held by other thread trying to stop the heartbeat for more than 31 seconds + isLockAcquired = heartbeatTaskLock.tryLock(31000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // safe to go on + } + + try { + if (isLockAcquired && heartbeatTask != null) { + heartbeatTask.cancel(true); + long startTime = System.currentTimeMillis(); + long sleepInterval = 100; + while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) { + // We will wait for 30 seconds for the task to be cancelled. + // If it's still not cancelled (unlikely), we will just move on. + long now = System.currentTimeMillis(); + if (now - startTime > 30000) { + LOG.warn("Heartbeat task cannot be cancelled for unknown reason. QueryId: " + queryId); + break; + } + try { + Thread.sleep(sleepInterval); + } catch (InterruptedException e) { + } + sleepInterval *= 2; } - try { - Thread.sleep(sleepInterval); - } catch (InterruptedException e) { + if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) { + LOG.info("Stopped heartbeat for query: " + queryId); } - sleepInterval *= 2; + heartbeatTask = null; + queryId = null; } - if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) { - LOG.info("Stopped heartbeat for query: " + queryId); + } finally { + if (isLockAcquired) { + heartbeatTaskLock.unlock(); } - heartbeatTask = null; - queryId = null; } }