diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 31f0634..df4e8d1 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -173,6 +173,7 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, HiveConf.ConfVars.HIVE_TXN_MANAGER, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, + HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE, HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, HiveConf.ConfVars.HIVE_METASTORE_STATS_NDV_DENSITY_FUNCTION, HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED, @@ -1505,6 +1506,8 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "no transactions."), HIVE_TXN_TIMEOUT("hive.txn.timeout", "300s", new TimeValidator(TimeUnit.SECONDS), "time after which transactions are declared aborted if the client has not sent a heartbeat."), + HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE("hive.txn.heartbeat.threadpool.size", 5, "The number of " + + "threads to use for heartbeating. For Hive CLI, 1 is enough. For HiveServer2, we need a few"), TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT("hive.txn.manager.dump.lock.state.on.acquire.timeout", false, "Set this to true so that when attempt to acquire a lock on resource times out, the current state" + " of the lock manager is dumped to log file. This is for debugging. See also " + diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index f6af6ca..123c666 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -399,6 +399,7 @@ public int compile(String command, boolean resetTaskIds) { public void run() { try { releaseLocksAndCommitOrRollback(false, txnManager); + txnManager.closeTxnManager(); } catch (LockException e) { LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " + e.getMessage()); @@ -1010,7 +1011,7 @@ private void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnM hiveLocks.addAll(ctx.getHiveLocks()); } if (!hiveLocks.isEmpty()) { - txnMgr.getLockManager().releaseLocks(hiveLocks); + txnMgr.releaseLocks(hiveLocks); } } hiveLocks.clear(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java deleted file mode 100644 index ff64563..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; -import org.apache.hadoop.hive.ql.lockmgr.LockException; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -/** - * Class to handle heartbeats for MR and Tez tasks. - */ -public class Heartbeater { - private long lastHeartbeat = 0; - private long heartbeatInterval = 0; - private boolean dontHeartbeat = false; - private HiveTxnManager txnMgr; - private Configuration conf; - - static final private Logger LOG = LoggerFactory.getLogger(Heartbeater.class.getName()); - - /** - * - * @param txnMgr transaction manager for this operation - * @param conf Configuration for this operation - */ - public Heartbeater(HiveTxnManager txnMgr, Configuration conf) { - this.txnMgr = txnMgr; - this.conf = conf; - } - - /** - * Send a heartbeat to the metastore for locks and transactions. - * @throws IOException - */ - public void heartbeat() throws IOException { - if (dontHeartbeat) return; - - if (txnMgr == null) { - LOG.debug("txnMgr null, not heartbeating"); - dontHeartbeat = true; - return; - } - - if (heartbeatInterval == 0) { - // Multiply the heartbeat interval by 1000 to convert to milliseconds, - // but divide by 2 to give us a safety factor. - heartbeatInterval = HiveConf.getTimeVar( - conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2; - if (heartbeatInterval == 0) { - LOG.warn(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set, heartbeats won't be sent"); - dontHeartbeat = true; - LOG.debug("heartbeat interval 0, not heartbeating"); - return; - } - } - long now = System.currentTimeMillis(); - if (now - lastHeartbeat > heartbeatInterval) { - try { - LOG.debug("heartbeating"); - txnMgr.heartbeat(); - } catch (LockException e) { - LOG.warn("Failed trying to heartbeat " + e.getMessage()); - throw new IOException(e); - } - lastHeartbeat = now; - } - } - -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 971dac9..00f2695 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -429,7 +429,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc, ctx.getHiveTxnManager()); + returnVal = jobExecHelper.progress(rj, jc); success = (returnVal == 0); } catch (Exception e) { e.printStackTrace(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 5f35630..1b296b9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -33,13 +33,11 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.MapRedStats; -import org.apache.hadoop.hive.ql.exec.Heartbeater; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskHandle; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -230,14 +228,12 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { int numReduce = -1; List clientStatPublishers = getClientStatPublishers(); final boolean localMode = ShimLoader.getHadoopShims().isLocalMode(job); - Heartbeater heartbeater = new Heartbeater(th.getTxnManager(), job); while (!rj.isComplete()) { try { Thread.sleep(pullInterval); } catch (InterruptedException e) { } - heartbeater.heartbeat(); if (initializing && rj.getJobState() == JobStatus.PREP) { // No reason to poll untill the job is initialized @@ -451,7 +447,6 @@ public void jobInfo(RunningJob rj) { private static class ExecDriverTaskHandle extends TaskHandle { JobClient jc; RunningJob rj; - HiveTxnManager txnMgr; JobClient getJobClient() { return jc; @@ -461,14 +456,9 @@ RunningJob getRunningJob() { return rj; } - HiveTxnManager getTxnManager() { - return txnMgr; - } - - public ExecDriverTaskHandle(JobClient jc, RunningJob rj, HiveTxnManager txnMgr) { + public ExecDriverTaskHandle(JobClient jc, RunningJob rj) { this.jc = jc; this.rj = rj; - this.txnMgr = txnMgr; } public void setRunningJob(RunningJob job) { @@ -522,7 +512,7 @@ public int progressLocal(Process runningJob, String taskId) { } - public int progress(RunningJob rj, JobClient jc, HiveTxnManager txnMgr) throws IOException { + public int progress(RunningJob rj, JobClient jc) throws IOException { jobId = rj.getID(); int returnVal = 0; @@ -543,7 +533,7 @@ public int progress(RunningJob rj, JobClient jc, HiveTxnManager txnMgr) throws I runningJobs.add(rj); - ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj, txnMgr); + ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj); jobInfo(rj); MapRedStats mapRedStats = progress(th); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index f6bc19c..3d82717 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -39,11 +39,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.Heartbeater; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -63,21 +61,6 @@ import com.google.common.base.Preconditions; -import java.io.IOException; -import java.io.PrintStream; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - /** * TezJobMonitor keeps track of a tez job while it's being executed. It will * print status to the console and retrieve final status of the job after @@ -220,11 +203,10 @@ public void repositionCursor() { * monitorExecution handles status printing, failures during execution and final status retrieval. * * @param dagClient client that was used to kick off the job - * @param txnMgr transaction manager for this operation * @param conf configuration file for this operation * @return int 0 - success, 1 - killed, 2 - failed */ - public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, HiveConf conf, + public int monitorExecution(final DAGClient dagClient, HiveConf conf, DAG dag) throws InterruptedException { long monitorStartTime = System.currentTimeMillis(); DAGStatus status = null; @@ -238,7 +220,6 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, Hi DAGStatus.State lastState = null; String lastReport = null; Set opts = new HashSet(); - Heartbeater heartbeater = new Heartbeater(txnMgr, conf); long startTime = 0; boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || Utilities.isPerfOrAboveLogging(conf); @@ -255,7 +236,6 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, Hi status = dagClient.getDAGStatus(opts, checkInterval); Map progressMap = status.getVertexProgress(); DAGStatus.State state = status.getState(); - heartbeater.heartbeat(); if (state != lastState || state == RUNNING) { lastState = state; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index a2060da..08109e0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -191,7 +191,7 @@ public int execute(DriverContext driverContext) { // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap()); - rc = monitor.monitorExecution(dagClient, ctx.getHiveTxnManager(), conf, dag); + rc = monitor.monitorExecution(dagClient, conf, dag); if (rc != 0) { this.setException(new HiveException(monitor.getDiagnostics())); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index 7453145..e23a969 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -149,7 +149,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc, null); + returnVal = jobExecHelper.progress(rj, jc); success = (returnVal == 0); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java index 188e9a6..829a9f6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java @@ -221,7 +221,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc, null); + returnVal = jobExecHelper.progress(rj, jc); success = (returnVal == 0); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index 08e3d80..34c067a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -185,7 +185,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc, null); + returnVal = jobExecHelper.progress(rj, jc); success = (returnVal == 0); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 7d58622..b1190fd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -218,13 +218,15 @@ public void unlock(HiveLock hiveLock) throws LockException { @Override public void releaseLocks(List hiveLocks) { - for (HiveLock lock : hiveLocks) { - try { - unlock(lock); - } catch (LockException e) { - // Not sure why this method doesn't throw any exceptions, - // but since the interface doesn't allow it we'll just swallow them and - // move on. + if (hiveLocks != null && !hiveLocks.isEmpty()) { + for (HiveLock lock : hiveLocks) { + try { + unlock(lock); + } catch (LockException e) { + // Not sure why this method doesn't throw any exceptions, + // but since the interface doesn't allow it we'll just swallow them and + // move on. + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 552367c..51ed576 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.lockmgr; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.JavaUtils; @@ -39,6 +41,13 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * An implementation of HiveTxnManager that stores the transactions in the @@ -65,6 +74,10 @@ */ private int statementId = -1; + // ExecutorService for sending heartbeat to metastore periodically. + private static ScheduledExecutorService heartbeatExecutorService = null; + private ScheduledFuture heartbeatTask = null; + DbTxnManager() { } @@ -104,6 +117,7 @@ public HiveLockManager getLockManager() throws LockException { @Override public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { acquireLocks(plan, ctx, username, true); + startHeartbeat(); } /** @@ -245,6 +259,24 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB ctx.setHiveLocks(locks); return lockState; } + /** + * This is for testing only. + * @param delay time to delay for first heartbeat + * @return null if no locks were needed + */ + @VisibleForTesting + void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException { + acquireLocks(plan, ctx, username, true); + startHeartbeatWithDelay(delay); + } + + + @Override + public void releaseLocks(List hiveLocks) throws LockException { + HiveLockManager lockManager = this.getLockManager(); + lockManager.releaseLocks(hiveLocks); + stopHeartbeat(); + } @Override public void commitTxn() throws LockException { @@ -253,6 +285,7 @@ public void commitTxn() throws LockException { } try { lockMgr.clearLocalLockRecords(); + stopHeartbeat(); LOG.debug("Committing txn " + JavaUtils.txnIdToString(txnId)); client.commitTxn(txnId); } catch (NoSuchTxnException e) { @@ -277,6 +310,7 @@ public void rollbackTxn() throws LockException { } try { lockMgr.clearLocalLockRecords(); + stopHeartbeat(); LOG.debug("Rolling back " + JavaUtils.txnIdToString(txnId)); client.rollbackTxn(txnId); } catch (NoSuchTxnException e) { @@ -337,6 +371,32 @@ public void heartbeat() throws LockException { } } + private void startHeartbeat() throws LockException { + startHeartbeatWithDelay(0); + } + + /** + * This is for testing only. Normally client should call {@link #startHeartbeat()} + * Make the heartbeater start before a specific delay period. + * @param delay time to delay before first execution, in milliseconds + */ + void startHeartbeatWithDelay(long delay) throws LockException { + initHeartbeatExecutorService(); + long heartbeatInterval = Heartbeater.getHeartbeatInterval(conf); + assert heartbeatInterval > 0; + heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate( + new Heartbeater(this), delay, heartbeatInterval, TimeUnit.MILLISECONDS); + LOG.info("Started " + Heartbeater.class.getName() + " with delay/interval = " + + 0 + "/" + heartbeatInterval + " " + TimeUnit.MILLISECONDS); + } + + private void stopHeartbeat() { + if (heartbeatTask != null && !heartbeatTask.isCancelled() && !heartbeatTask.isDone()) { + heartbeatTask.cancel(true); + heartbeatTask = null; + } + } + @Override public ValidTxnList getValidTxns() throws LockException { init(); @@ -366,6 +426,13 @@ public boolean supportsAcid() { @Override protected void destruct() { try { + // clean up heartbeat thread pool + if (heartbeatExecutorService != null + && !heartbeatExecutorService.isShutdown() + && !heartbeatExecutorService.isTerminated()) { + heartbeatExecutorService.shutdown(); + LOG.info("Shutting down Heartbeater thread pool."); + } if (isTxnOpen()) rollbackTxn(); if (lockMgr != null) lockMgr.close(); } catch (Exception e) { @@ -384,6 +451,7 @@ private void init() throws LockException { try { Hive db = Hive.get(conf); client = db.getMSC(); + initHeartbeatExecutorService(); } catch (MetaException e) { throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e); } catch (HiveException e) { @@ -391,6 +459,26 @@ private void init() throws LockException { } } } + + void initHeartbeatExecutorService() { + if (heartbeatExecutorService != null + && !heartbeatExecutorService.isShutdown() + && !heartbeatExecutorService.isTerminated()) { + return; + } + + int threadPoolSize = conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE); + heartbeatExecutorService = + Executors.newScheduledThreadPool(threadPoolSize, new ThreadFactory() { + private final AtomicInteger threadCounter = new AtomicInteger(); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "Heartbeater-" + threadCounter.getAndIncrement()); + } + }); + ((ScheduledThreadPoolExecutor) heartbeatExecutorService).setRemoveOnCancelPolicy(true); + } + @Override public boolean isTxnOpen() { return txnId > 0; @@ -403,4 +491,45 @@ public long getCurrentTxnId() { public int getStatementId() { return statementId; } + + + /** + * Heartbeater thread + */ + public static class Heartbeater implements Runnable { + private HiveTxnManager txnMgr; + + /** + * + * @param txnMgr transaction manager for this operation + */ + public Heartbeater(HiveTxnManager txnMgr) { + this.txnMgr = txnMgr; + } + + /** + * Send a heartbeat to the metastore for locks and transactions. + */ + @Override + public void run() { + try { + LOG.debug("Heartbeating..."); + txnMgr.heartbeat(); + } catch (LockException e) { + LOG.error("Failed trying to heartbeat " + e.getMessage()); + } + } + + public static long getHeartbeatInterval(Configuration conf) throws LockException { + // Multiply the heartbeat interval by 1000 to convert to milliseconds, + // but divide by 2 to give us a safety factor. + long interval = + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.SECONDS) * 1000 / 2; + if (interval == 0) { + throw new LockException(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set," + + " heartbeats won't be sent"); + } + return interval; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 2d30198..cac4507 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -181,6 +181,12 @@ else if (output.getTyp() == WriteEntity.Type.DUMMYPARTITION) { } @Override + public void releaseLocks(List hiveLocks) throws LockException { + HiveLockManager lockManager = this.getLockManager(); + lockManager.releaseLocks(hiveLocks); + } + + @Override public void commitTxn() throws LockException { // No-op } diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 2bfc732..4862c79 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc; import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; +import java.util.List; + /** * An interface that allows Hive to manage transactions. All classes * implementing this should extend {@link HiveTxnManagerImpl} rather than @@ -67,6 +69,12 @@ void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException; /** + * Release all the locks needed by a query. + * @param hiveLocks The list of locks to be released. + */ + void releaseLocks(List hiveLocks) throws LockException; + + /** * Commit the current transaction. This will release all locks obtained in * {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, * org.apache.hadoop.hive.ql.Context, java.lang.String)}. diff --git ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index b7d1d18..53cc2a3 100644 --- ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -42,6 +42,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; /** * Unit tests for {@link DbTxnManager}. @@ -200,8 +201,8 @@ private void runReaper() throws Exception { public void testExceptions() throws Exception { WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT); QueryPlan qp = new MockQueryPlan(this); - txnMgr.acquireLocks(qp, ctx, "PeterI"); - txnMgr.openTxn("NicholasII"); + txnMgr.acquireLocks(qp, ctx, "Peter"); + txnMgr.openTxn("Peter"); runReaper(); LockException exception = null; try { @@ -212,8 +213,9 @@ public void testExceptions() throws Exception { } Assert.assertNotNull("Expected exception1", exception); Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); + exception = null; - txnMgr.openTxn("AlexanderIII"); + txnMgr.openTxn("Peter"); runReaper(); try { txnMgr.rollbackTxn(); @@ -223,20 +225,6 @@ public void testExceptions() throws Exception { } Assert.assertNotNull("Expected exception2", exception); Assert.assertEquals("Wrong Exception2", ErrorMsg.TXN_NO_SUCH_TRANSACTION, exception.getCanonicalErrorMsg()); - exception = null; - txnMgr.openTxn("PeterI"); - txnMgr.acquireLocks(qp, ctx, "PeterI"); - List locks = ctx.getHiveLocks(); - Assert.assertThat("Unexpected lock count", locks.size(), is(1)); - runReaper(); - try { - txnMgr.heartbeat(); - } - catch(LockException ex) { - exception = ex; - } - Assert.assertNotNull("Expected exception3", exception); - Assert.assertEquals("Wrong Exception3", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); } @Test @@ -355,16 +343,83 @@ public void concurrencyFalse() throws Exception { Assert.assertTrue(sawException); } + @Test + public void testLockAcquisitionAndRelease() throws Exception { + addTableInput(); + QueryPlan qp = new MockQueryPlan(this); + txnMgr.acquireLocks(qp, ctx, "fred"); + List locks = ctx.getHiveLocks(); + Assert.assertEquals(1, locks.size()); + txnMgr.releaseLocks(); + locks = txnMgr.getLockManager().getLocks(false, false); + Assert.assertEquals(0, locks.size()); + } + + @Test + public void testHeartbeater() throws Exception { + HiveTxnManager txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); + Assert.assertTrue(txnMgr instanceof DbTxnManager); + // Init heartbeater thread pool + ((DbTxnManager) txnMgr).initHeartbeatExecutorService(); + + addTableInput(); + LockException exception = null; + QueryPlan qp = new MockQueryPlan(this); + + // Case 1: If there's no delay for the heartbeat, txn should be able to commit + txnMgr.openTxn("fred"); + txnMgr.acquireLocks(qp, ctx, "fred"); // heartbeat started.. + runReaper(); + try { + txnMgr.commitTxn(); + } catch (LockException e) { + exception = e; + } + Assert.assertNull("Txn commit should be successful", exception); + exception = null; + + // Case 2: If there's delay for the heartbeat, but the delay is within the reaper's tolerance, + // then txt should be able to commit + txnMgr.openTxn("tom"); + // Start the heartbeat after a delay, which is shorter than the HIVE_TXN_TIMEOUT + ((DbTxnManager) txnMgr).acquireLocksWithHeartbeatDelay(qp, ctx, "tom", + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS)); + runReaper(); + try { + txnMgr.commitTxn(); + } catch (LockException e) { + exception = e; + } + Assert.assertNull("Txn commit should also be successful", exception); + exception = null; + + // Case 3: If there's delay for the heartbeat, and the delay is long enough to trigger the reaper, + // then the txn will time out and be aborted + txnMgr.openTxn("jerry"); + // Start the heartbeat after a delay, which exceeds the HIVE_TXN_TIMEOUT + ((DbTxnManager) txnMgr).acquireLocksWithHeartbeatDelay(qp, ctx, "jerry", + HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 1000); + runReaper(); + try { + txnMgr.commitTxn(); + } catch (LockException e) { + exception = e; + } + Assert.assertNotNull("Txn should have been aborted", exception); + Assert.assertEquals(ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg()); + } + @Before public void setUp() throws Exception { TxnDbUtil.prepDb(); txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); Assert.assertTrue(txnMgr instanceof DbTxnManager); + ((DbTxnManager) txnMgr).initHeartbeatExecutorService(); nextInput = 1; readEntities = new HashSet(); writeEntities = new HashSet(); conf.setTimeVar(HiveConf.ConfVars.HIVE_TIMEDOUT_TXN_REAPER_START, 0, TimeUnit.SECONDS); - conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 1, TimeUnit.MILLISECONDS); + conf.setTimeVar(HiveConf.ConfVars.HIVE_TXN_TIMEOUT, 2, TimeUnit.MILLISECONDS); houseKeeperService = new AcidHouseKeeperService(); }