diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java new file mode 100644 index 0000000..7fdb4e7 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Heartbeater.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; + +import java.io.IOException; + +/** + * Class to handle heartbeats for MR and Tez tasks. + */ +public class Heartbeater { + private long lastHeartbeat = 0; + private long heartbeatInterval = 0; + private boolean dontHeartbeat = false; + private HiveTxnManager txnMgr; + private Configuration conf; + + static final private Log LOG = LogFactory.getLog(Heartbeater.class.getName()); + + /** + * + * @param txnMgr transaction manager for this operation + * @param conf Configuration for this operation + */ + public Heartbeater(HiveTxnManager txnMgr, Configuration conf) { + this.txnMgr = txnMgr; + this.conf = conf; + } + + /** + * Send a heartbeat to the metastore for locks and transactions. + * @throws IOException + */ + public void heartbeat() throws IOException { + if (dontHeartbeat) return; + + if (txnMgr == null) { + LOG.debug("txnMgr null, not heartbeating"); + dontHeartbeat = true; + return; + } + + if (heartbeatInterval == 0) { + // Multiply the heartbeat interval by 1000 to convert to milliseconds, + // but divide by 2 to give us a safety factor. + heartbeatInterval = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT) * 500; + if (heartbeatInterval == 0) { + LOG.warn(HiveConf.ConfVars.HIVE_TXN_MANAGER.toString() + " not set, heartbeats won't be sent"); + dontHeartbeat = true; + LOG.debug("heartbeat interval 0, not heartbeating"); + return; + } + } + long now = System.currentTimeMillis(); + if (now - lastHeartbeat > heartbeatInterval) { + try { + LOG.debug("heartbeating"); + txnMgr.heartbeat(); + } catch (LockException e) { + LOG.warn("Failed trying to heartbeat " + e.getMessage()); + throw new IOException(e); + } + lastHeartbeat = now; + } + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 288da8e..dc9f36a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -422,7 +422,7 @@ public int execute(DriverContext driverContext) { HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd); } - returnVal = jobExecHelper.progress(rj, jc); + returnVal = jobExecHelper.progress(rj, jc, ctx.getHiveTxnManager()); success = (returnVal == 0); } catch (Exception e) { e.printStackTrace(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index a4585de..ee2e9e1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -36,11 +36,14 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.MapRedStats; +import org.apache.hadoop.hive.ql.exec.Heartbeater; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskHandle; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -71,6 +74,7 @@ private LogHelper console; private HadoopJobExecHook callBackObj; + /** * Update counters relevant to this task. */ @@ -231,11 +235,14 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { int numReduce = -1; List clientStatPublishers = getClientStatPublishers(); + Heartbeater heartbeater = new Heartbeater(th.getTxnManager(), job); + while (!rj.isComplete()) { try { Thread.sleep(pullInterval); } catch (InterruptedException e) { } + heartbeater.heartbeat(); if (initializing && rj.getJobState() == JobStatus.PREP) { // No reason to poll untill the job is initialized @@ -415,6 +422,7 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { return mapRedStats; } + private String getId() { return this.task.getId(); } @@ -444,6 +452,7 @@ public void jobInfo(RunningJob rj) { private static class ExecDriverTaskHandle extends TaskHandle { JobClient jc; RunningJob rj; + HiveTxnManager txnMgr; JobClient getJobClient() { return jc; @@ -453,9 +462,14 @@ RunningJob getRunningJob() { return rj; } - public ExecDriverTaskHandle(JobClient jc, RunningJob rj) { + HiveTxnManager getTxnManager() { + return txnMgr; + } + + public ExecDriverTaskHandle(JobClient jc, RunningJob rj, HiveTxnManager txnMgr) { this.jc = jc; this.rj = rj; + this.txnMgr = txnMgr; } public void setRunningJob(RunningJob job) { @@ -508,7 +522,7 @@ public int progressLocal(Process runningJob, String taskId) { } - public int progress(RunningJob rj, JobClient jc) throws IOException { + public int progress(RunningJob rj, JobClient jc, HiveTxnManager txnMgr) throws IOException { jobId = rj.getJobID(); int returnVal = 0; @@ -529,7 +543,7 @@ public int progress(RunningJob rj, JobClient jc) throws IOException { runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL() + "&action=kill"); - ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj); + ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj, txnMgr); jobInfo(rj); MapRedStats mapRedStats = progress(th); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 9211d6c..c97f595 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -32,6 +32,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Heartbeater; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.tez.dag.api.TezException; @@ -93,9 +96,12 @@ public TezJobMonitor() { * status retrieval. * * @param dagClient client that was used to kick off the job + * @param txnMgr transaction manager for this operation + * @param conf configuration file for this operation * @return int 0 - success, 1 - killed, 2 - failed */ - public int monitorExecution(final DAGClient dagClient) throws InterruptedException { + public int monitorExecution(final DAGClient dagClient, HiveTxnManager txnMgr, + HiveConf conf) throws InterruptedException { DAGStatus status = null; completed = new HashSet(); @@ -106,6 +112,7 @@ public int monitorExecution(final DAGClient dagClient) throws InterruptedExcepti DAGStatus.State lastState = null; String lastReport = null; Set opts = new HashSet(); + Heartbeater heartbeater = new Heartbeater(txnMgr, conf); shutdownList.add(dagClient); @@ -119,6 +126,7 @@ public int monitorExecution(final DAGClient dagClient) throws InterruptedExcepti status = dagClient.getDAGStatus(opts); Map progressMap = status.getVertexProgress(); DAGStatus.State state = status.getState(); + heartbeater.heartbeat(); if (state != lastState || state == RUNNING) { lastState = state; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index dd1ef9d..5dd8f98 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -147,7 +147,7 @@ public int execute(DriverContext driverContext) { // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(); - rc = monitor.monitorExecution(client); + rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf); // fetch the counters Set statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java index 5a6899c..35f5bd9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java @@ -213,7 +213,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc); + returnVal = jobExecHelper.progress(rj, jc, null); success = (returnVal == 0); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java index 4b58d92..b3458d6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java @@ -220,7 +220,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc); + returnVal = jobExecHelper.progress(rj, jc, null); success = (returnVal == 0); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index 21b537c..11d15be 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -189,7 +189,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc); + returnVal = jobExecHelper.progress(rj, jc, null); success = (returnVal == 0); } catch (Exception e) {