diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 93c7a54..e1cda0a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -33,6 +33,12 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.lang.StringUtils; @@ -51,6 +57,7 @@ import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.Heartbeater; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; @@ -129,6 +136,16 @@ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private LogHelper console = new LogHelper(LOG); static final int SHUTDOWN_HOOK_PRIORITY = 0; + private static final int THREAD_POOL_SIZE = 10; + // ExecutorService for sending heartbeat to metastore periodically. Mainly for transaction use. + private static final ScheduledExecutorService heartbeatExecutorService = + Executors.newScheduledThreadPool(THREAD_POOL_SIZE, new ThreadFactory() { + private final AtomicInteger threadCounter = new AtomicInteger(); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "Heartbeater-" + threadCounter.getAndIncrement()); + } + }); private int maxRows = 100; ByteStream.Output bos = new ByteStream.Output(); @@ -161,6 +178,8 @@ // HS2 operation handle guid string private String operationId; + private ScheduledFuture heartbeatTask; + private boolean checkConcurrency() { boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); if (!supportConcurrency) { @@ -400,6 +419,10 @@ public int compile(String command, boolean resetTaskIds) { public void run() { try { releaseLocksAndCommitOrRollback(false, txnManager); + if (heartbeatExecutorService != null && !heartbeatExecutorService.isShutdown()) { + heartbeatExecutorService.shutdown(); + LOG.info("Shut down Heartbeat thread pool."); + } } catch (LockException e) { LOG.warn("Exception when releasing locks in ShutdownHook for Driver: " + e.getMessage()); @@ -1126,6 +1149,8 @@ private void releaseResources() { driverCxt = null; } plan = null; + + stopHeartbeatTask(); } @Override @@ -1326,6 +1351,17 @@ else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) { if (ret != 0) { return rollback(createProcessorResponse(ret)); } + long heartbeatInterval = Heartbeater.getHeartbeatInterval(conf); + if (heartbeatInterval == 0) { + LOG.warn(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set, heartbeats won't be sent"); + LOG.debug("heartbeat interval 0, not heartbeating"); + // proceed with caution + } else { + heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate( + new Heartbeater(txnManager, conf), 0, heartbeatInterval, TimeUnit.MILLISECONDS); + LOG.info("Started " + Heartbeater.class.getName() + " with delay/interval = " + + 0 + "/" + heartbeatInterval + " " + TimeUnit.MILLISECONDS); + } } ret = execute(); if (ret != 0) { @@ -1902,6 +1938,9 @@ public int close() { LOG.debug(" Exception while closing the resStream ", e); } } + + stopHeartbeatTask(); + } catch (Exception e) { console.printError("FAILED: Hive Internal Error: " + Utilities.getNameMessage(e) + "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); @@ -1924,6 +1963,8 @@ public void destroy() { e.getMessage()); } } + + stopHeartbeatTask(); } public org.apache.hadoop.hive.ql.plan.api.Query getQueryPlan() throws IOException { @@ -1942,4 +1983,11 @@ public void setOperationId(String opId) { this.operationId = opId; } + private void stopHeartbeatTask() { + if (heartbeatTask != null && !heartbeatTask.isCancelled() && !heartbeatTask.isDone()) { + heartbeatTask.cancel(true); + heartbeatTask = null; + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java index ff64563..e415abf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java @@ -24,13 +24,12 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; -import java.io.IOException; import java.util.concurrent.TimeUnit; /** * Class to handle heartbeats for MR and Tez tasks. */ -public class Heartbeater { +public class Heartbeater implements Runnable { private long lastHeartbeat = 0; private long heartbeatInterval = 0; private boolean dontHeartbeat = false; @@ -51,9 +50,9 @@ public Heartbeater(HiveTxnManager txnMgr, Configuration conf) { /** * Send a heartbeat to the metastore for locks and transactions. - * @throws IOException */ - public void heartbeat() throws IOException { + @Override + public void run() { if (dontHeartbeat) return; if (txnMgr == null) { @@ -63,10 +62,7 @@ public void heartbeat() throws IOException { } if (heartbeatInterval == 0) { - // Multiply the heartbeat interval by 1000 to convert to milliseconds, - // but divide by 2 to give us a safety factor. - heartbeatInterval = HiveConf.getTimeVar( - conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2; + heartbeatInterval = getHeartbeatInterval(conf); if (heartbeatInterval == 0) { LOG.warn(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set, heartbeats won't be sent"); dontHeartbeat = true; @@ -81,10 +77,15 @@ public void heartbeat() throws IOException { txnMgr.heartbeat(); } catch (LockException e) { LOG.warn("Failed trying to heartbeat " + e.getMessage()); - throw new IOException(e); + return; } lastHeartbeat = now; } } + public static long getHeartbeatInterval(Configuration conf) { + // Multiply the heartbeat interval by 1000 to convert to milliseconds, + // but divide by 2 to give us a safety factor. + return HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 5cbf764..7e9c6bf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -438,7 +438,7 @@ public int execute(DriverContext driverContext) { HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd); } - returnVal = jobExecHelper.progress(rj, jc, ctx.getHiveTxnManager()); + returnVal = jobExecHelper.progress(rj, jc); success = (returnVal == 0); } catch (Exception e) { e.printStackTrace(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 44dfe3e..6ecb598 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -35,13 +35,11 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.MapRedStats; -import org.apache.hadoop.hive.ql.exec.Heartbeater; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskHandle; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -231,14 +229,12 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { int numReduce = -1; List clientStatPublishers = getClientStatPublishers(); final boolean localMode = ShimLoader.getHadoopShims().isLocalMode(job); - Heartbeater heartbeater = new Heartbeater(th.getTxnManager(), job); while (!rj.isComplete()) { try { Thread.sleep(pullInterval); } catch (InterruptedException e) { } - heartbeater.heartbeat(); if (initializing && rj.getJobState() == JobStatus.PREP) { // No reason to poll untill the job is initialized @@ -452,7 +448,6 @@ public void jobInfo(RunningJob rj) { private static class ExecDriverTaskHandle extends TaskHandle { JobClient jc; RunningJob rj; - HiveTxnManager txnMgr; JobClient getJobClient() { return jc; @@ -462,14 +457,9 @@ RunningJob getRunningJob() { return rj; } - HiveTxnManager getTxnManager() { - return txnMgr; - } - - public ExecDriverTaskHandle(JobClient jc, RunningJob rj, HiveTxnManager txnMgr) { + public ExecDriverTaskHandle(JobClient jc, RunningJob rj) { this.jc = jc; this.rj = rj; - this.txnMgr = txnMgr; } public void setRunningJob(RunningJob job) { @@ -523,7 +513,7 @@ public int progressLocal(Process runningJob, String taskId) { } - public int progress(RunningJob rj, JobClient jc, HiveTxnManager txnMgr) throws IOException { + public int progress(RunningJob rj, JobClient jc) throws IOException { jobId = rj.getID(); int returnVal = 0; @@ -544,7 +534,7 @@ public int progress(RunningJob rj, JobClient jc, HiveTxnManager txnMgr) throws I runningJobs.add(rj); - ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj, txnMgr); + ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj); jobInfo(rj); MapRedStats mapRedStats = progress(th); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 59e9d29..245e088 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -38,11 +38,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.Heartbeater; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -62,21 +60,6 @@ import com.google.common.base.Preconditions; -import java.io.IOException; -import java.io.PrintStream; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - /** * TezJobMonitor keeps track of a tez job while it's being executed. It will * print status to the console and retrieve final status of the job after @@ -221,11 +204,10 @@ public void repositionCursor() { * monitorExecution handles status printing, failures during execution and final status retrieval. * * @param dagClient client that was used to kick off the job - * @param txnMgr transaction manager for this operation * @param conf configuration file for this operation * @return int 0 - success, 1 - killed, 2 - failed */ - public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, HiveConf conf, + public int monitorExecution(final DAGClient dagClient, HiveConf conf, DAG dag) throws InterruptedException { long monitorStartTime = System.currentTimeMillis(); DAGStatus status = null; @@ -239,7 +221,6 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, Hi DAGStatus.State lastState = null; String lastReport = null; Set opts = new HashSet(); - Heartbeater heartbeater = new Heartbeater(txnMgr, conf); long startTime = 0; boolean isProfileEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || Utilities.isPerfOrAboveLogging(conf); @@ -256,7 +237,6 @@ public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, Hi status = dagClient.getDAGStatus(opts, checkInterval); Map progressMap = status.getVertexProgress(); DAGStatus.State state = status.getState(); - heartbeater.heartbeat(); if (state != lastState || state == RUNNING) { lastState = state; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index a6d911d..e08e628 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -182,7 +182,7 @@ public int execute(DriverContext driverContext) { // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap()); - rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf, dag); + rc = monitor.monitorExecution(client, conf, dag); if (rc != 0) { this.setException(new HiveException(monitor.getDiagnostics())); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index 2f09014..439079f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -149,7 +149,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc, null); + returnVal = jobExecHelper.progress(rj, jc); success = (returnVal == 0); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java index fd04fb5..da62d00 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java @@ -221,7 +221,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc, null); + returnVal = jobExecHelper.progress(rj, jc); success = (returnVal == 0); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index 79b3cfa..d1f9f58 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -184,7 +184,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc, null); + returnVal = jobExecHelper.progress(rj, jc); success = (returnVal == 0); } catch (Exception e) {