diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 9988eec..b14a283 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +83,7 @@ private static ScheduledExecutorService heartbeatExecutorService = null; private ScheduledFuture heartbeatTask = null; private Runnable shutdownRunner = null; - static final int SHUTDOWN_HOOK_PRIORITY = 0; + private static final int SHUTDOWN_HOOK_PRIORITY = 0; DbTxnManager() { shutdownRunner = new Runnable() { @@ -139,6 +138,7 @@ public HiveLockManager getLockManager() throws LockException { @Override public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { try { + queryId = plan.getQueryId(); acquireLocksWithHeartbeatDelay(plan, ctx, username, 0); } catch(LockException e) { @@ -162,9 +162,9 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB boolean atLeastOneLock = false; - LockRequestBuilder rqstBuilder = new LockRequestBuilder(plan.getQueryId()); + LockRequestBuilder rqstBuilder = new LockRequestBuilder(queryId); //link queryId to txnId - LOG.info("Setting lock request transaction to " + JavaUtils.txnIdToString(txnId) + " for queryId=" + plan.getQueryId()); + LOG.info("Setting lock request transaction to " + JavaUtils.txnIdToString(txnId) + " for queryId=" + queryId); rqstBuilder.setTransactionId(txnId) .setUser(username); @@ -304,7 +304,7 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB // Make sure we need locks. It's possible there's nothing to lock in // this operation. if (!atLeastOneLock) { - LOG.debug("No locks needed for queryId" + plan.getQueryId()); + LOG.debug("No locks needed for queryId" + queryId); return null; } @@ -312,7 +312,7 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB if(isTxnOpen()) { statementId++; } - LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), isBlocking, locks); + LockState lockState = lockMgr.lock(rqstBuilder.build(), queryId, isBlocking, locks); ctx.setHiveLocks(locks); return lockState; } @@ -324,15 +324,13 @@ private static Table getTable(WriteEntity we) { return t; } /** - * This is for testing only. + * This is for testing only. Normally client should call {@link #acquireLocks(QueryPlan, Context, String, boolean)} * @param delay time to delay for first heartbeat - * @return null if no locks were needed */ @VisibleForTesting void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException { acquireLocks(plan, ctx, username, true); ctx.setHeartbeater(startHeartbeat(delay)); - queryId = plan.getQueryId(); } @@ -439,24 +437,25 @@ public void heartbeat() throws LockException { } } - private Heartbeater startHeartbeat() throws LockException { - return startHeartbeat(0); - } - /** - * This is for testing only. Normally client should call {@link #startHeartbeat()} - * Make the heartbeater start before an initial delay period. - * @param delay time to delay before first execution, in milliseconds - * @return heartbeater + * Start the heartbeater threadpool and return the task. + * @param initialDelay time to delay before first execution, in milliseconds + * @return heartbeater */ - Heartbeater startHeartbeat(long delay) throws LockException { + private Heartbeater startHeartbeat(long initialDelay) throws LockException { long heartbeatInterval = getHeartbeatInterval(conf); assert heartbeatInterval > 0; Heartbeater heartbeater = new Heartbeater(this, conf); + // For negative testing purpose.. + if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) { + initialDelay = 0; + } else if (initialDelay == 0) { + initialDelay = heartbeatInterval; + } heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate( - heartbeater, delay, heartbeatInterval, TimeUnit.MILLISECONDS); - LOG.info("Started heartbeat with delay/interval = " + 0 + "/" + heartbeatInterval + " " + - TimeUnit.MILLISECONDS + " for query: " + queryId); + heartbeater, initialDelay, heartbeatInterval, TimeUnit.MILLISECONDS); + LOG.info("Started heartbeat with delay/interval = " + initialDelay + "/" + heartbeatInterval + + " " + TimeUnit.MILLISECONDS + " for query: " + queryId); return heartbeater; } @@ -584,7 +583,7 @@ public int getStatementId() { return statementId; } - public static long getHeartbeatInterval(Configuration conf) throws LockException { + private static long getHeartbeatInterval(Configuration conf) throws LockException { // Retrieve HIVE_TXN_TIMEOUT in MILLISECONDS (it's defined as SECONDS), // then divide it by 2 to give us a safety factor. long interval = @@ -612,7 +611,7 @@ public LockException getLockException() { * * @param txnMgr transaction manager for this operation */ - public Heartbeater(HiveTxnManager txnMgr, HiveConf conf) { + Heartbeater(HiveTxnManager txnMgr, HiveConf conf) { this.txnMgr = txnMgr; this.conf = conf; lockException = null;