diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index bb74d99..dd90d58 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1135,6 +1135,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVETESTCURRENTTIMESTAMP("hive.test.currenttimestamp", null, "current timestamp for test", false), HIVETESTMODEROLLBACKTXN("hive.test.rollbacktxn", false, "For testing only. Will mark every ACID transaction aborted", false), HIVETESTMODEFAILCOMPACTION("hive.test.fail.compaction", false, "For testing only. Will cause CompactorMR to fail.", false), + HIVETESTMODEFAILHEARTBEATER("hive.test.fail.heartbeater", false, "For testing only. Will cause Heartbeater to fail.", false), HIVEMERGEMAPFILES("hive.merge.mapfiles", true, "Merge small files at the end of a map-only job"), diff --git ql/src/java/org/apache/hadoop/hive/ql/Context.java ql/src/java/org/apache/hadoop/hive/ql/Context.java index 6f18c82..92b4e5b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -43,9 +43,11 @@ import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager.Heartbeater; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; +import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.session.SessionState; @@ -121,6 +123,8 @@ private final String stagingDir; + private Heartbeater heartbeater; + public Context(Configuration conf) throws IOException { this(conf, generateExecutionId()); } @@ -760,4 +764,18 @@ public AtomicInteger getSequencer() { public CompilationOpContext getOpContext() { return opContext; } + + public Heartbeater getHeartbeater() { + return heartbeater; + } + + public void setHeartbeater(Heartbeater heartbeater) { + this.heartbeater = heartbeater; + } + + public void checkHeartbeaterLockException() throws LockException { + if (getHeartbeater() != null && getHeartbeater().getLockException() != null) { + throw getHeartbeater().getLockException(); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 926f6e8..8a6499b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -433,10 +433,11 @@ public int execute(DriverContext driverContext) { rj = jc.submitJob(job); this.jobID = rj.getJobID(); updateStatusInQueryDisplay(); - returnVal = jobExecHelper.progress(rj, jc); + returnVal = jobExecHelper.progress(rj, jc, ctx); success = (returnVal == 0); } catch (Exception e) { e.printStackTrace(); + setException(e); String mesg = " with exception '" + Utilities.getNameMessage(e) + "'"; if (rj != null) { mesg = "Ended Job = " + rj.getJobID() + mesg; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 11f5cfd..ce9e0e2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.MapRedStats; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Operator; @@ -39,6 +40,7 @@ import org.apache.hadoop.hive.ql.exec.TaskHandle; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory.Keys; +import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; @@ -213,7 +215,7 @@ public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) { return this.callBackObj.checkFatalErrors(ctrs, errMsg); } - private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { + private MapRedStats progress(ExecDriverTaskHandle th) throws IOException, LockException { JobClient jc = th.getJobClient(); RunningJob rj = th.getRunningJob(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); @@ -233,6 +235,10 @@ private MapRedStats progress(ExecDriverTaskHandle th) throws IOException { final boolean localMode = ShimLoader.getHadoopShims().isLocalMode(job); while (!rj.isComplete()) { + if (th.getContext() != null) { + th.getContext().checkHeartbeaterLockException(); + } + try { Thread.sleep(pullInterval); } catch (InterruptedException e) { @@ -452,6 +458,7 @@ public void jobInfo(RunningJob rj) { private static class ExecDriverTaskHandle extends TaskHandle { JobClient jc; RunningJob rj; + Context ctx; JobClient getJobClient() { return jc; @@ -461,9 +468,14 @@ RunningJob getRunningJob() { return rj; } - public ExecDriverTaskHandle(JobClient jc, RunningJob rj) { + Context getContext() { + return ctx; + } + + public ExecDriverTaskHandle(JobClient jc, RunningJob rj, Context ctx) { this.jc = jc; this.rj = rj; + this.ctx = ctx; } public void setRunningJob(RunningJob job) { @@ -517,7 +529,7 @@ public int progressLocal(Process runningJob, String taskId) { } - public int progress(RunningJob rj, JobClient jc) throws IOException { + public int progress(RunningJob rj, JobClient jc, Context ctx) throws IOException, LockException { jobId = rj.getID(); int returnVal = 0; @@ -538,7 +550,7 @@ public int progress(RunningJob rj, JobClient jc) throws IOException { runningJobs.add(rj); - ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj); + ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj, ctx); jobInfo(rj); MapRedStats mapRedStats = progress(th); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index b22991c..838f320 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; @@ -217,7 +218,7 @@ public void repositionCursor() { * @return int 0 - success, 1 - killed, 2 - failed */ public int monitorExecution(final DAGClient dagClient, HiveConf conf, - DAG dag) throws InterruptedException { + DAG dag, Context ctx) throws InterruptedException { long monitorStartTime = System.currentTimeMillis(); DAGStatus status = null; completed = new HashSet(); @@ -247,6 +248,10 @@ public int monitorExecution(final DAGClient dagClient, HiveConf conf, while (true) { try { + if (ctx != null) { + ctx.checkHeartbeaterLockException(); + } + status = dagClient.getDAGStatus(opts, checkInterval); progressMap = status.getVertexProgress(); DAGStatus.State state = status.getState(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 3789ce9..9d69840 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -187,7 +187,7 @@ public int execute(DriverContext driverContext) { // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap()); - rc = monitor.monitorExecution(dagClient, conf, dag); + rc = monitor.monitorExecution(dagClient, conf, dag, ctx); if (rc != 0) { this.setException(new HiveException(monitor.getDiagnostics())); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index 6b0343b..0fedd48 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -151,7 +151,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc); + returnVal = jobExecHelper.progress(rj, jc, ctx); success = (returnVal == 0); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java index d31510d..6771b3e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java @@ -223,7 +223,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc); + returnVal = jobExecHelper.progress(rj, jc, ctx); success = (returnVal == 0); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index 8acd6e0..ffc6311 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -187,7 +187,7 @@ public int execute(DriverContext driverContext) { // Finally SUBMIT the JOB! rj = jc.submitJob(job); - returnVal = jobExecHelper.progress(rj, jc); + returnVal = jobExecHelper.progress(rj, jc, ctx); success = (returnVal == 0); } catch (Exception e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 9c2a346..0778737 100644 --- ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -76,6 +76,9 @@ */ private int statementId = -1; + // QueryId for the query in current transaction + private String queryId; + // ExecutorService for sending heartbeat to metastore periodically. private static ScheduledExecutorService heartbeatExecutorService = null; private ScheduledFuture heartbeatTask = null; @@ -136,7 +139,8 @@ public HiveLockManager getLockManager() throws LockException { public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException { try { acquireLocks(plan, ctx, username, true); - startHeartbeat(); + ctx.setHeartbeater(startHeartbeat()); + queryId = plan.getQueryId(); } catch(LockException e) { if(e.getCause() instanceof TxnAbortedException) { @@ -306,6 +310,7 @@ LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isB void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException { acquireLocks(plan, ctx, username, true); startHeartbeat(delay); + queryId = plan.getQueryId(); } @@ -412,27 +417,49 @@ public void heartbeat() throws LockException { } } - private void startHeartbeat() throws LockException { - startHeartbeat(0); + private Heartbeater startHeartbeat() throws LockException { + return startHeartbeat(0); } /** * This is for testing only. Normally client should call {@link #startHeartbeat()} * Make the heartbeater start before an initial delay period. * @param delay time to delay before first execution, in milliseconds + * @return heartbeater */ - void startHeartbeat(long delay) throws LockException { + Heartbeater startHeartbeat(long delay) throws LockException { long heartbeatInterval = getHeartbeatInterval(conf); assert heartbeatInterval > 0; + Heartbeater heartbeater = new Heartbeater(this, conf); heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate( - new Heartbeater(this), delay, heartbeatInterval, TimeUnit.MILLISECONDS); - LOG.info("Started " + Heartbeater.class.getName() + " with delay/interval = " + - 0 + "/" + heartbeatInterval + " " + TimeUnit.MILLISECONDS); + heartbeater, delay, heartbeatInterval, TimeUnit.MILLISECONDS); + LOG.info("Started heartbeat with delay/interval = " + 0 + "/" + heartbeatInterval + " " + + TimeUnit.MILLISECONDS + " for query: " + queryId); + return heartbeater; } - private void stopHeartbeat() { - if (heartbeatTask != null && !heartbeatTask.isCancelled() && !heartbeatTask.isDone()) { - heartbeatTask.cancel(false); + private void stopHeartbeat() throws LockException { + if (heartbeatTask != null) { + heartbeatTask.cancel(true); + long startTime = System.currentTimeMillis(); + long sleepInterval = 100; + while (!heartbeatTask.isCancelled() && !heartbeatTask.isDone()) { + // We will wait for 30 seconds for the task to be cancelled. + // If it's still not cancelled (unlikely), we will just move on. + long now = System.currentTimeMillis(); + if (now - startTime > 30000) { + LOG.warn("Heartbeat task cannot be cancelled for unknown reason. QueryId: " + queryId); + break; + } + try { + Thread.sleep(sleepInterval); + } catch (InterruptedException e) { + } + sleepInterval *= 2; + } + if (heartbeatTask.isCancelled() || heartbeatTask.isDone()) { + LOG.info("Stopped heartbeat for query: " + queryId); + } heartbeatTask = null; } } @@ -551,13 +578,21 @@ public static long getHeartbeatInterval(Configuration conf) throws LockException */ public static class Heartbeater implements Runnable { private HiveTxnManager txnMgr; + private HiveConf conf; + + LockException lockException; + public LockException getLockException() { + return lockException; + } /** * * @param txnMgr transaction manager for this operation */ - public Heartbeater(HiveTxnManager txnMgr) { + public Heartbeater(HiveTxnManager txnMgr, HiveConf conf) { this.txnMgr = txnMgr; + this.conf = conf; + lockException = null; } /** @@ -566,10 +601,16 @@ public Heartbeater(HiveTxnManager txnMgr) { @Override public void run() { try { + // For negative testing purpose.. + if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) { + throw new LockException(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER.name() + "=true"); + } + LOG.debug("Heartbeating..."); txnMgr.heartbeat(); } catch (LockException e) { LOG.error("Failed trying to heartbeat " + e.getMessage()); + lockException = e; } } } diff --git ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 472da0b..903337d 100644 --- ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -51,6 +51,7 @@ import org.junit.rules.TestName; import java.io.File; +import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; @@ -746,6 +747,27 @@ public void writeBetweenWorkerAndCleaner() throws Exception { Assert.assertEquals("", expected, runStatementOnDriver("select a,b from " + tblName + " order by a")); } + + /** + * Simulate the scenario when a heartbeat failed due to client errors such as no locks or no txns being found. + * When a heartbeat fails, the query should be failed too. + * @throws Exception + */ + @Test + public void testFailHeartbeater() throws Exception { + // Fail heartbeater, so that we can get a RuntimeException from the query. + // More specifically, it's the original IOException thrown by either MR's or Tez's progress monitoring loop. + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER, true); + Exception exception = null; + try { + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(new int[][]{{1, 2}, {3, 4}})); + } catch (RuntimeException e) { + exception = e; + } + Assert.assertNotNull(exception); + Assert.assertTrue(exception.getMessage().contains("HIVETESTMODEFAILHEARTBEATER=true")); + } + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order diff --git ql/src/test/results/clientnegative/index_compact_entry_limit.q.out ql/src/test/results/clientnegative/index_compact_entry_limit.q.out index b65f94e..f844ee4 100644 --- ql/src/test/results/clientnegative/index_compact_entry_limit.q.out +++ ql/src/test/results/clientnegative/index_compact_entry_limit.q.out @@ -34,4 +34,4 @@ PREHOOK: type: QUERY PREHOOK: Input: default@src #### A masked pattern was here #### Job Submission failed with exception 'java.io.IOException(org.apache.hadoop.hive.ql.metadata.HiveException: Number of compact index entries loaded during the query exceeded the maximum of 5 set in hive.index.compact.query.max.entries)' -FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. org.apache.hadoop.hive.ql.metadata.HiveException: Number of compact index entries loaded during the query exceeded the maximum of 5 set in hive.index.compact.query.max.entries diff --git ql/src/test/results/clientnegative/index_compact_size_limit.q.out ql/src/test/results/clientnegative/index_compact_size_limit.q.out index 299cc47..9ff8f8f 100644 --- ql/src/test/results/clientnegative/index_compact_size_limit.q.out +++ ql/src/test/results/clientnegative/index_compact_size_limit.q.out @@ -34,4 +34,4 @@ PREHOOK: type: QUERY PREHOOK: Input: default@src #### A masked pattern was here #### Job Submission failed with exception 'java.io.IOException(Size of data to read during a compact-index-based query exceeded the maximum of 1024 set in hive.index.compact.query.max.size)' -FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. Size of data to read during a compact-index-based query exceeded the maximum of 1024 set in hive.index.compact.query.max.size