diff --git ql/src/java/org/apache/hadoop/hive/ql/Context.java ql/src/java/org/apache/hadoop/hive/ql/Context.java index 6f18c82..0945580 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager.Heartbeater; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; @@ -121,6 +122,8 @@ private final String stagingDir; + private Heartbeater heartbeater; + public Context(Configuration conf) throws IOException { this(conf, generateExecutionId()); } @@ -760,4 +763,12 @@ public AtomicInteger getSequencer() { public CompilationOpContext getOpContext() { return opContext; } + + public Heartbeater getHeartbeater() { + return heartbeater; + } + + public void setHeartbeater(Heartbeater heartbeater) { + this.heartbeater = heartbeater; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index d164859..da20471 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -432,7 +432,7 @@ public int execute(DriverContext driverContext) { rj = jc.submitJob(job); this.jobID = rj.getJobID(); - returnVal = jobExecHelper.progress(rj, jc); + returnVal = jobExecHelper.progress(rj, jc, ctx); success = (returnVal == 0); } catch (Exception e) { e.printStackTrace(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 1b296b9..13ed73e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; @@ -230,6 +231,12 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { final boolean localMode = ShimLoader.getHadoopShims().isLocalMode(job); while (!rj.isComplete()) { + if (th.getContext() != null && th.getContext().getHeartbeater() != null && + th.getContext().getHeartbeater().getLockException() != null) { + throw new IOException("Need to abort execution due to LockException: " + + th.getContext().getHeartbeater().getLockException().getMessage()); + } + try { Thread.sleep(pullInterval); } catch (InterruptedException e) { @@ -447,6 +454,7 @@ public void jobInfo(RunningJob rj) { private static class ExecDriverTaskHandle extends TaskHandle { JobClient jc; RunningJob rj; + Context ctx; JobClient getJobClient() { return jc; @@ -456,9 +464,14 @@ RunningJob getRunningJob() { return rj; } - public ExecDriverTaskHandle(JobClient jc, RunningJob rj) { + Context getContext() { + return ctx; + } + + public ExecDriverTaskHandle(JobClient jc, RunningJob rj, Context ctx) { this.jc = jc; this.rj = rj; + this.ctx = ctx; } public void setRunningJob(RunningJob job) { @@ -512,7 +525,7 @@ public int progressLocal(Process runningJob, String taskId) { } - public int progress(RunningJob rj, JobClient jc) throws IOException { + public int progress(RunningJob rj, JobClient jc, Context ctx) throws IOException { jobId = rj.getID(); int returnVal = 0; @@ -533,7 +546,7 @@ public int progress(RunningJob rj, JobClient jc) throws IOException { runningJobs.add(rj); - ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj); + ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj, ctx); jobInfo(rj); MapRedStats mapRedStats = progress(th); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 7a7e2ac..7489302 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; @@ -208,7 +209,7 @@ public void repositionCursor() { * @return int 0 - success, 1 - killed, 2 - failed */ public int monitorExecution(final DAGClient dagClient, HiveConf conf, - DAG dag) throws InterruptedException { + DAG dag, Context ctx) throws InterruptedException { long monitorStartTime = System.currentTimeMillis(); DAGStatus status = null; completed = new HashSet(); @@ -238,6 +239,12 @@ public int monitorExecution(final DAGClient dagClient, HiveConf conf, while (true) { try { + if (ctx != null && ctx.getHeartbeater() != null && + ctx.getHeartbeater().getLockException() != null) { + throw new IOException("Need to abort execution due to LockException: " + + ctx.getHeartbeater().getLockException().getMessage()); + } + status = dagClient.getDAGStatus(opts, checkInterval); progressMap = status.getVertexProgress(); DAGStatus.State state = status.getState(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 3789ce9..9d69840 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -187,7 +187,7 @@ public int execute(DriverContext driverContext) { // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap()); - rc = monitor.monitorExecution(dagClient, conf, dag); + rc = monitor.monitorExecution(dagClient, conf, dag, ctx); if (rc != 0) { this.setException(new HiveException(monitor.getDiagnostics())); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index 82629c1..9e0ca2b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -150,7 +150,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc); + returnVal = jobExecHelper.progress(rj, jc, ctx); success = (returnVal == 0); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java index 71371a3..c94a2f7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java @@ -222,7 +222,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc); + returnVal = jobExecHelper.progress(rj, jc, ctx); success = (returnVal == 0); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index bc21da0..568e1fb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -186,7 +186,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc); + returnVal = jobExecHelper.progress(rj, jc, ctx); success = (returnVal == 0); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index e8ebe55..8e7c62a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -132,7 +132,7 @@ public HiveLockManager getLockManager() throws LockException { @Override public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { acquireLocks(plan, ctx, username, true); - startHeartbeat(); + ctx.setHeartbeater(startHeartbeat()); } /** @@ -387,27 +387,50 @@ public void heartbeat() throws LockException { } } - private void startHeartbeat() throws LockException { - startHeartbeat(0); + private Heartbeater startHeartbeat() throws LockException { + return startHeartbeat(0); } /** * This is for testing only. Normally client should call {@link #startHeartbeat()} * Make the heartbeater start before an initial delay period. * @param delay time to delay before first execution, in milliseconds + * @return heartbeater */ - void startHeartbeat(long delay) throws LockException { + Heartbeater startHeartbeat(long delay) throws LockException { long heartbeatInterval = getHeartbeatInterval(conf); assert heartbeatInterval > 0; + Heartbeater heartbeater = new Heartbeater(this); heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate( - new Heartbeater(this), delay, heartbeatInterval, TimeUnit.MILLISECONDS); + heartbeater, delay, heartbeatInterval, TimeUnit.MILLISECONDS); LOG.info("Started " + Heartbeater.class.getName() + " with delay/interval = " + 0 + "/" + heartbeatInterval + " " + TimeUnit.MILLISECONDS); + return heartbeater; } - private void stopHeartbeat() { - if (heartbeatTask != null && !heartbeatTask.isCancelled() && !heartbeatTask.isDone()) { - heartbeatTask.cancel(false); + private void stopHeartbeat() throws LockException { + if (heartbeatTask != null) { + heartbeatTask.cancel(true); + long startTime = System.currentTimeMillis(); + long sleepInterval = 100; + while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) { + // We will wait for 30 seconds for the task to be cancelled. + // If it's still not cancelled (unlikely), we will just move on. + long now = System.currentTimeMillis(); + if (now - startTime > 30000) { + LOG.warn("Heartbeat task cannot be cancelled for unknown reason"); + break; + } + try { + Thread.sleep(sleepInterval); + } catch (InterruptedException e) { + e.printStackTrace(); + } + sleepInterval *= 2; + } + if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) { + LOG.info("Stopped " + Heartbeater.class.getName()); + } heartbeatTask = null; } } @@ -527,12 +550,18 @@ public static long getHeartbeatInterval(Configuration conf) throws LockException public static class Heartbeater implements Runnable { private HiveTxnManager txnMgr; + LockException lockException; + public LockException getLockException() { + return lockException; + } + /** * * @param txnMgr transaction manager for this operation */ public Heartbeater(HiveTxnManager txnMgr) { this.txnMgr = txnMgr; + lockException = null; } /** @@ -545,6 +574,7 @@ public void run() { txnMgr.heartbeat(); } catch (LockException e) { LOG.error("Failed trying to heartbeat " + e.getMessage()); + lockException = e; } } }