commit 11ee628e04a7fe5dcf8354a5c24c379dd8547b4a Author: Sahil Takiar Date: Mon Mar 26 18:49:59 2018 -0700 HIVE-18684: Race condition in RemoteSparkJobMonitor diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 3467ae4048..166a4df226 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -36,6 +36,17 @@ * It print current job status to console and sleep current thread between monitor interval. */ public class RemoteSparkJobMonitor extends SparkJobMonitor { + + static final String LOG_WEB_URL = "Hive on Spark Session Web UI URL: "; + static final String LOG_APP_ID = "Running with YARN Application = "; + static final String LOG_APP_KILL = " application -kill "; + static final String LOG_STAGES_FORMAT = "Hive on Spark job[%s] stages: %s"; + static final String LOG_JOB_PROGRESS_FORMAT = "Job Progress Format\nCurrentTime " + + "StageId_StageAttemptId: " + + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount"; + static final String LOG_JOB_FINISHED_FORMAT = "Spark job[%s] finished successfully in "; + static final String LOG_JOB_FAILED_MESSAGE = "Job failed with "; + private int sparkJobMaxTaskCount = -1; private int sparkStageMaxTaskCount = -1; private int totalTaskCount = 0; @@ -92,22 +103,14 @@ public int startMonitor() { Map progressMap = sparkJobStatus.getSparkStageProgress(); if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); + printAppInfo(); - console.printInfo("Hive on Spark Session Web UI URL: " + sparkJobStatus.getWebUIURL()); - // print job stages. - console.printInfo("\nQuery Hive on Spark job[" + sparkJobStatus.getJobId() + - "] stages: " + Arrays.toString(sparkJobStatus.getStageIds())); + printJobStages(); console.printInfo("Spark job[" + sparkJobStatus.getJobId() + "] status = RUNNING"); running = true; - String format = "Job Progress Format\nCurrentTime StageId_StageAttemptId: " - + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount"; - if (!inPlaceUpdate) { - console.printInfo(format); - } else { - console.logInfo(format); - } + printJobProgressFormat(); } else { // Get the maximum of the number of tasks in the stages of the job and cancel the job if it goes beyond the limit. if (sparkStageMaxTaskCount != -1 && stageMaxTaskCount == 0) { @@ -137,19 +140,38 @@ public int startMonitor() { } break; case SUCCEEDED: + // It's possible for the Spark job to transition into a succeeded state without ever + // entering the running state (e.g. a very short lived Spark job) + if (!running) { + printAppInfo(); + printJobStages(); + printJobProgressFormat(); + } + Map progressMap = sparkJobStatus.getSparkStageProgress(); printStatus(progressMap, lastProgressMap); lastProgressMap = progressMap; double duration = (System.currentTimeMillis() - startTime) / 1000.0; - console.printInfo("Spark job[" + sparkJobStatus.getJobId() + "] finished successfully in " - + String.format("%.2f second(s)", duration)); + console.printInfo(String.format(LOG_JOB_FINISHED_FORMAT, sparkJobStatus.getJobId()) + + String.format("%.2f second(s)", duration)); running = false; done = true; break; case FAILED: + if (!running) { + // Only print these on a best effort basis, as we want to ensure we properly + // propagate any exceptions from the Spark job + try { + printAppInfo(); + printJobStages(); + } catch (Exception e) { + // no-op + } + } + String detail = sparkJobStatus.getError().getMessage(); StringBuilder errBuilder = new StringBuilder(); - errBuilder.append("Job failed with "); + errBuilder.append(LOG_JOB_FAILED_MESSAGE); if (detail == null) { errBuilder.append("UNKNOWN reason"); } else { @@ -215,14 +237,29 @@ public int startMonitor() { return rc; } + private void printJobProgressFormat() { + if (!inPlaceUpdate) { + console.printInfo(LOG_JOB_PROGRESS_FORMAT); + } else { + console.logInfo(LOG_JOB_PROGRESS_FORMAT); + } + } + + private void printJobStages() throws HiveException { + console.printInfo(String.format(LOG_STAGES_FORMAT, sparkJobStatus.getJobId(), + Arrays.toString(sparkJobStatus.getStageIds()))); + } + private void printAppInfo() { + console.printInfo(LOG_WEB_URL + sparkJobStatus.getWebUIURL()); + String sparkMaster = hiveConf.get("spark.master"); if (sparkMaster != null && sparkMaster.startsWith("yarn")) { String appID = sparkJobStatus.getAppID(); if (appID != null) { - console.printInfo("Running with YARN Application = " + appID); + console.printInfo(LOG_APP_ID + appID); console.printInfo("Kill Command = " + - HiveConf.getVar(hiveConf, HiveConf.ConfVars.YARNBIN) + " application -kill " + appID); + HiveConf.getVar(hiveConf, HiveConf.ConfVars.YARNBIN) + LOG_APP_KILL + appID); } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestRemoteSparkJobMonitor.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestRemoteSparkJobMonitor.java new file mode 100644 index 0000000000..6db589e609 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestRemoteSparkJobMonitor.java @@ -0,0 +1,132 @@ +package org.apache.hadoop.hive.ql.exec.spark.status; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.spark.client.JobHandle; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestRemoteSparkJobMonitor { + + @Test + public void testPrintAppAndJobInfoSentToSucceeded() throws ExecutionException, + InterruptedException, UnsupportedEncodingException, HiveException { + HiveConf hiveConf = new HiveConf(); + hiveConf.set("spark.master", "yarn"); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final PrintStream ps = new PrintStream(baos, true, StandardCharsets.UTF_8.displayName()); + + String appId = "my-test-app-id"; + String webURL = "my-test-web-url"; + int jobId = 1; + int[] stageIds = {1, 2, 3}; + + RemoteSparkJobStatus remoteJobStatus = createMockRemoteSparkJobStatus(appId, webURL, jobId, + stageIds); + SparkJobMonitor jobMonitor = new RemoteSparkJobMonitor(hiveConf, remoteJobStatus); + Future monitorFuture = startMonitor(hiveConf, jobMonitor, ps); + + Thread.sleep(5000); + + when(remoteJobStatus.getRemoteJobState()).thenReturn(JobHandle.State.SUCCEEDED); + + Assert.assertEquals(0, (int) monitorFuture.get()); + + String jobMonitorOutput = baos.toString(); + validateCommonLogOutput(jobMonitorOutput, appId, webURL, jobId, stageIds); + + Assert.assertTrue(jobMonitorOutput.contains(RemoteSparkJobMonitor.LOG_JOB_PROGRESS_FORMAT)); + Assert.assertTrue(jobMonitorOutput.contains(String.format(RemoteSparkJobMonitor + .LOG_JOB_FINISHED_FORMAT, jobId))); + Assert.assertTrue(jobMonitorOutput.contains("Stage-1_0: 10/10 Finished\tStage-2_0: 10/10 " + + "Finished\tStage-3_0: 15/15 Finished")); + } + + @Test + public void testPrintAppAndJobInfoSentToFailed() throws ExecutionException, + InterruptedException, HiveException, UnsupportedEncodingException { + HiveConf hiveConf = new HiveConf(); + hiveConf.set("spark.master", "yarn"); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final PrintStream ps = new PrintStream(baos, true, StandardCharsets.UTF_8.displayName()); + + String appId = "my-test-app-id"; + String webURL = "my-test-web-url"; + int jobId = 1; + int[] stageIds = {1, 2, 3}; + + RemoteSparkJobStatus remoteJobStatus = createMockRemoteSparkJobStatus(appId, webURL, jobId, + stageIds); + when(remoteJobStatus.getError()).thenReturn(new Throwable(Throwables.getStackTraceAsString + (new Throwable()))); + SparkJobMonitor jobMonitor = new RemoteSparkJobMonitor(hiveConf, remoteJobStatus); + Future monitorFuture = startMonitor(hiveConf, jobMonitor, ps); + + Thread.sleep(5000); + + when(remoteJobStatus.getRemoteJobState()).thenReturn(JobHandle.State.FAILED); + + Assert.assertEquals(3, (int) monitorFuture.get()); + + String jobMonitorOutput = baos.toString(); + validateCommonLogOutput(jobMonitorOutput, appId, webURL, jobId, stageIds); + Assert.assertTrue(jobMonitorOutput.contains(RemoteSparkJobMonitor.LOG_JOB_FAILED_MESSAGE + + Throwable.class.getName())); + } + + private void validateCommonLogOutput(String jobMonitorOutput, String appId, String webURL, + int jobId, int[] stageIds) { + Assert.assertTrue(jobMonitorOutput.contains(RemoteSparkJobMonitor.LOG_APP_ID + appId)); + Assert.assertTrue(jobMonitorOutput.contains(RemoteSparkJobMonitor.LOG_APP_KILL + appId)); + Assert.assertTrue(jobMonitorOutput.contains(RemoteSparkJobMonitor.LOG_WEB_URL + webURL)); + Assert.assertTrue(jobMonitorOutput.contains(String.format(RemoteSparkJobMonitor + .LOG_STAGES_FORMAT, jobId, Arrays.toString(stageIds)))); + } + + private Future startMonitor(HiveConf hiveConf, SparkJobMonitor jobMonitor, + PrintStream ps) { + return CompletableFuture.supplyAsync( + () -> { + SessionState.start(hiveConf); + SessionState.get().info = ps; + SessionState.get().err = ps; + return jobMonitor.startMonitor(); + }); + } + + private RemoteSparkJobStatus createMockRemoteSparkJobStatus(String appId, String webURL, int + jobId, int[] stageIds) throws HiveException { + RemoteSparkJobStatus remoteJobStatus = mock(RemoteSparkJobStatus.class); + when(remoteJobStatus.isRemoteActive()).thenReturn(true); + when(remoteJobStatus.getRemoteJobState()).thenReturn(JobHandle.State.SENT); + when(remoteJobStatus.getAppID()).thenReturn(appId); + when(remoteJobStatus.getWebUIURL()).thenReturn(webURL); + when(remoteJobStatus.getJobId()).thenReturn(jobId); + when(remoteJobStatus.getStageIds()).thenReturn(stageIds); + when(remoteJobStatus.getSparkStageProgress()).thenReturn(ImmutableMap.of( + "1_0", new SparkStageProgress(10, 10, 0, 0), + "2_0", new SparkStageProgress(10, 10, 0, 0), + "3_0", new SparkStageProgress(15, 15, 0, 0))); + return remoteJobStatus; + } +}