diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 9988eec..29e0f76 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +83,7 @@ private static ScheduledExecutorService heartbeatExecutorService = null; private ScheduledFuture heartbeatTask = null; private Runnable shutdownRunner = null; - static final int SHUTDOWN_HOOK_PRIORITY = 0; + private static final int SHUTDOWN_HOOK_PRIORITY = 0; DbTxnManager() { shutdownRunner = new Runnable() { @@ -324,9 +323,8 @@ private static Table getTable(WriteEntity we) { return t; } /** - * This is for testing only. + * This is for testing only. Normally client should call {@link #acquireLocks(QueryPlan, Context, String, boolean)} * @param delay time to delay for first heartbeat - * @return null if no locks were needed */ @VisibleForTesting void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException { @@ -439,24 +437,25 @@ public void heartbeat() throws LockException { } } - private Heartbeater startHeartbeat() throws LockException { - return startHeartbeat(0); - } - /** - * This is for testing only. Normally client should call {@link #startHeartbeat()} - * Make the heartbeater start before an initial delay period. - * @param delay time to delay before first execution, in milliseconds - * @return heartbeater + * Start the heartbeater threadpool and return the task. + * @param initialDelay time to delay before first execution, in milliseconds + * @return heartbeater */ - Heartbeater startHeartbeat(long delay) throws LockException { + private Heartbeater startHeartbeat(long initialDelay) throws LockException { long heartbeatInterval = getHeartbeatInterval(conf); assert heartbeatInterval > 0; Heartbeater heartbeater = new Heartbeater(this, conf); + // For negative testing purpose.. + if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) { + initialDelay = 0; + } else if (initialDelay == 0) { + initialDelay = heartbeatInterval; + } heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate( - heartbeater, delay, heartbeatInterval, TimeUnit.MILLISECONDS); - LOG.info("Started heartbeat with delay/interval = " + 0 + "/" + heartbeatInterval + " " + - TimeUnit.MILLISECONDS + " for query: " + queryId); + heartbeater, initialDelay, heartbeatInterval, TimeUnit.MILLISECONDS); + LOG.info("Started heartbeat with delay/interval = " + initialDelay + "/" + heartbeatInterval + + " " + TimeUnit.MILLISECONDS + " for query: " + queryId); return heartbeater; } @@ -584,7 +583,7 @@ public int getStatementId() { return statementId; } - public static long getHeartbeatInterval(Configuration conf) throws LockException { + private static long getHeartbeatInterval(Configuration conf) throws LockException { // Retrieve HIVE_TXN_TIMEOUT in MILLISECONDS (it's defined as SECONDS), // then divide it by 2 to give us a safety factor. long interval = @@ -612,7 +611,7 @@ public LockException getLockException() { * * @param txnMgr transaction manager for this operation */ - public Heartbeater(HiveTxnManager txnMgr, HiveConf conf) { + Heartbeater(HiveTxnManager txnMgr, HiveConf conf) { this.txnMgr = txnMgr; this.conf = conf; lockException = null;