commit 98cd681b65ee05f2e562ebba972845f4b005d562 Author: Sahil Takiar Date: Mon Mar 26 18:49:59 2018 -0700 HIVE-18684: Race condition in RemoteSparkJobMonitor diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 98c228b268..38db636a39 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -36,6 +36,16 @@ * It print current job status to console and sleep current thread between monitor interval. */ public class RemoteSparkJobMonitor extends SparkJobMonitor { + + static final String LOG_WEB_URL = "Hive on Spark Session Web UI URL: "; + static final String LOG_APP_ID = "Running with YARN Application = "; + static final String LOG_APP_KILL = " application -kill "; + static final String LOG_STAGES_FORMAT = "Hive on Spark job[%s] stages: %s"; + static final String LOG_JOB_PROGRESS_HEADER = "Job Progress Format"; + static final String LOG_JOB_PROGRESS_FORMAT = "CurrentTime StageId_StageAttemptId: " + + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount"; + static final String LOG_JOB_FINISHED_FORMAT = "Spark job[%s] finished successfully in "; + private int sparkJobMaxTaskCount = -1; private int sparkStageMaxTaskCount = -1; private int totalTaskCount = 0; @@ -92,44 +102,17 @@ public int startMonitor() { Map progressMap = sparkJobStatus.getSparkStageProgress(); if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); + printAppInfo(); - console.printInfo("Hive on Spark Session Web UI URL: " + sparkJobStatus.getWebUIURL()); - // print job stages. - console.printInfo("\nQuery Hive on Spark job[" + sparkJobStatus.getJobId() + - "] stages: " + Arrays.toString(sparkJobStatus.getStageIds())); + printJobStages(); console.printInfo("Spark job[" + sparkJobStatus.getJobId() + "] status = RUNNING"); running = true; - String format = "Job Progress Format\nCurrentTime StageId_StageAttemptId: " - + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount"; - if (!inPlaceUpdate) { - console.printInfo(format); - } else { - console.logInfo(format); - } - } else { - // Get the maximum of the number of tasks in the stages of the job and cancel the job if it goes beyond the limit. - if (sparkStageMaxTaskCount != -1 && stageMaxTaskCount == 0) { - stageMaxTaskCount = getStageMaxTaskCount(progressMap); - if (stageMaxTaskCount > sparkStageMaxTaskCount) { - rc = 4; - done = true; - console.printInfo("\nThe number of task in one stage of the Spark job [" + stageMaxTaskCount + "] is greater than the limit [" + - sparkStageMaxTaskCount + "]. The Spark job will be cancelled."); - } - } - - // Count the number of tasks, and kill application if it goes beyond the limit. - if (sparkJobMaxTaskCount != -1 && totalTaskCount == 0) { - totalTaskCount = getTotalTaskCount(progressMap); - if (totalTaskCount > sparkJobMaxTaskCount) { - rc = 4; - done = true; - console.printInfo("\nThe total number of task in the Spark job [" + totalTaskCount + "] is greater than the limit [" + - sparkJobMaxTaskCount + "]. The Spark job will be cancelled."); - } - } + printJobProgressFormat(); + } else if (exceededMaxStageTasks(progressMap) || exceededMaxJobTasks(progressMap)) { + rc = 4; + done = true; } printStatus(progressMap, lastProgressMap); @@ -137,22 +120,31 @@ public int startMonitor() { } break; case SUCCEEDED: - Map progressMap = sparkJobStatus.getSparkStageProgress(); - printStatus(progressMap, lastProgressMap); - lastProgressMap = progressMap; + // It's possible for the Spark job to transition into a succeeded state without ever + // entering the running state (e.g. a very short lived Spark job) + if (!running) { + printJobInfo(); + } + + printStatus(sparkJobStatus.getSparkStageProgress(), lastProgressMap); + double duration = (System.currentTimeMillis() - startTime) / 1000.0; - console.printInfo("Spark job[" + sparkJobStatus.getJobId() + "] finished successfully in " - + String.format("%.2f second(s)", duration)); + console.printInfo(String.format(LOG_JOB_FINISHED_FORMAT, sparkJobStatus.getJobId()) + + String.format("%.2f second(s)", duration)); running = false; done = true; break; case FAILED: + printJobInfoFailedOrCancelled(running); + LOG.error("Spark job[" + sparkJobStatus.getJobId() + "] failed", sparkJobStatus.getSparkJobException()); running = false; done = true; rc = 3; break; case CANCELLED: + printJobInfoFailedOrCancelled(running); + console.printInfo("Spark job[" + sparkJobStatus.getJobId() + " was cancelled"); running = false; done = true; @@ -192,14 +184,97 @@ public int startMonitor() { return rc; } + /** + * Get the maximum of the number of tasks in the stages of the job and cancel the job if it goes beyond the limit. + */ + private boolean exceededMaxStageTasks(Map progressMap) { + if (sparkStageMaxTaskCount != -1 && stageMaxTaskCount == 0) { + stageMaxTaskCount = getStageMaxTaskCount(progressMap); + if (stageMaxTaskCount > sparkStageMaxTaskCount) { + console.printInfo("\nThe number of task in one stage of the Spark job [" + stageMaxTaskCount + "] is greater than the limit [" + + sparkStageMaxTaskCount + "]. The Spark job will be cancelled."); + return true; + } + } + return false; + } + + + /** + * Count the number of tasks, and kill application if it goes beyond the limit. + */ + private boolean exceededMaxJobTasks(Map progressMap) { + if (sparkJobMaxTaskCount != -1 && totalTaskCount == 0) { + totalTaskCount = getTotalTaskCount(progressMap); + if (totalTaskCount > sparkJobMaxTaskCount) { + console.printInfo("\nThe total number of task in the Spark job [" + totalTaskCount + "] is greater than the limit [" + + sparkJobMaxTaskCount + "]. The Spark job will be cancelled."); + return true; + } + } + return false; + } + + /** + * Print information to the console related to the Spark job being run + */ + private void printJobInfo() throws HiveException { + printAppInfo(); + printJobStages(); + printJobProgressFormat(); + } + + /** + * If the Spark job failed or has been cancelled, print out information to the console related + * to the Spark job + */ + private void printJobInfoFailedOrCancelled(boolean running) { + if (!running) { + // Only print these on a best effort basis, as we want to ensure we properly + // propagate any exceptions from the Spark job + try { + printJobInfo(); + printStatus(sparkJobStatus.getSparkStageProgress(), null); + } catch (Exception e) { + // no-op + } + } + } + + /** + * Print out the progress monitor format for the Spark job + */ + private void printJobProgressFormat() { + if (!inPlaceUpdate) { + console.printInfo(LOG_JOB_PROGRESS_HEADER); + console.printInfo(LOG_JOB_PROGRESS_FORMAT); + } else { + console.logInfo(LOG_JOB_PROGRESS_HEADER); + console.logInfo(LOG_JOB_PROGRESS_FORMAT); + } + } + + /** + * Print all the stages in the Spark job + */ + private void printJobStages() throws HiveException { + console.printInfo(String.format(LOG_STAGES_FORMAT, sparkJobStatus.getJobId(), + Arrays.toString(sparkJobStatus.getStageIds()))); + } + + /** + * Print info about the YARN application that the Spark job is being run in + */ private void printAppInfo() { + console.printInfo(LOG_WEB_URL + sparkJobStatus.getWebUIURL()); + String sparkMaster = hiveConf.get("spark.master"); if (sparkMaster != null && sparkMaster.startsWith("yarn")) { String appID = sparkJobStatus.getAppID(); if (appID != null) { - console.printInfo("Running with YARN Application = " + appID); + console.printInfo(LOG_APP_ID + appID); console.printInfo("Kill Command = " + - HiveConf.getVar(hiveConf, HiveConf.ConfVars.YARNBIN) + " application -kill " + appID); + HiveConf.getVar(hiveConf, HiveConf.ConfVars.YARNBIN) + LOG_APP_KILL + appID); } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestRemoteSparkJobMonitor.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestRemoteSparkJobMonitor.java new file mode 100644 index 0000000000..f90c9c6267 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/status/TestRemoteSparkJobMonitor.java @@ -0,0 +1,132 @@ +package org.apache.hadoop.hive.ql.exec.spark.status; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.spark.client.JobHandle; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestRemoteSparkJobMonitor { + + @Test + public void testPrintJobInfoSentToSucceeded() throws ExecutionException, + InterruptedException, UnsupportedEncodingException, HiveException { + HiveConf hiveConf = new HiveConf(); + hiveConf.set("spark.master", "yarn"); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final PrintStream ps = new PrintStream(baos, true, StandardCharsets.UTF_8.displayName()); + + String appId = "my-test-app-id"; + String webURL = "my-test-web-url"; + int jobId = 1; + int[] stageIds = {1, 2, 3}; + + RemoteSparkJobStatus remoteJobStatus = createMockRemoteSparkJobStatus(appId, webURL, jobId, + stageIds); + SparkJobMonitor jobMonitor = new RemoteSparkJobMonitor(hiveConf, remoteJobStatus); + Future monitorFuture = startMonitor(hiveConf, jobMonitor, ps); + + Thread.sleep(5000); + + when(remoteJobStatus.getRemoteJobState()).thenReturn(JobHandle.State.SUCCEEDED); + + Assert.assertEquals(0, (int) monitorFuture.get()); + + String jobMonitorOutput = baos.toString(); + validateCommonLogOutput(jobMonitorOutput, appId, webURL, jobId, stageIds); + + Assert.assertTrue(jobMonitorOutput.contains( + String.format(RemoteSparkJobMonitor.LOG_JOB_FINISHED_FORMAT, jobId))); + Assert.assertTrue(jobMonitorOutput.contains("Stage-1_0: 10/10 Finished\tStage-2_0: 10/10 " + + "Finished\tStage-3_0: 15/15 Finished")); + } + + @Test + public void testPrintJobInfoSentToFailed() throws ExecutionException, + InterruptedException, HiveException, UnsupportedEncodingException { + HiveConf hiveConf = new HiveConf(); + hiveConf.set("spark.master", "yarn"); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final PrintStream ps = new PrintStream(baos, true, StandardCharsets.UTF_8.displayName()); + + String appId = "my-test-app-id"; + String webURL = "my-test-web-url"; + int jobId = 1; + int[] stageIds = {1, 2, 3}; + + RemoteSparkJobStatus remoteJobStatus = createMockRemoteSparkJobStatus(appId, webURL, jobId, + stageIds); + when(remoteJobStatus.getSparkJobException()).thenReturn( + new Throwable(Throwables.getStackTraceAsString(new Throwable()))); + SparkJobMonitor jobMonitor = new RemoteSparkJobMonitor(hiveConf, remoteJobStatus); + Future monitorFuture = startMonitor(hiveConf, jobMonitor, ps); + + Thread.sleep(5000); + + when(remoteJobStatus.getRemoteJobState()).thenReturn(JobHandle.State.FAILED); + + Assert.assertEquals(3, (int) monitorFuture.get()); + + String jobMonitorOutput = baos.toString(); + validateCommonLogOutput(jobMonitorOutput, appId, webURL, jobId, stageIds); + } + + private void validateCommonLogOutput(String jobMonitorOutput, String appId, String webURL, + int jobId, int[] stageIds) { + Assert.assertTrue(jobMonitorOutput.contains(RemoteSparkJobMonitor.LOG_APP_ID + appId)); + Assert.assertTrue(jobMonitorOutput.contains(RemoteSparkJobMonitor.LOG_APP_KILL + appId)); + Assert.assertTrue(jobMonitorOutput.contains(RemoteSparkJobMonitor.LOG_WEB_URL + webURL)); + Assert.assertTrue(jobMonitorOutput.contains( + String.format(RemoteSparkJobMonitor.LOG_STAGES_FORMAT, jobId, + Arrays.toString(stageIds)))); + Assert.assertTrue(jobMonitorOutput.contains(RemoteSparkJobMonitor.LOG_JOB_PROGRESS_HEADER)); + Assert.assertTrue(jobMonitorOutput.contains(RemoteSparkJobMonitor.LOG_JOB_PROGRESS_FORMAT)); + } + + private Future startMonitor(HiveConf hiveConf, SparkJobMonitor jobMonitor, + PrintStream ps) { + return CompletableFuture.supplyAsync( + () -> { + SessionState.start(hiveConf); + SessionState.get().info = ps; + SessionState.get().err = ps; + return jobMonitor.startMonitor(); + }); + } + + private RemoteSparkJobStatus createMockRemoteSparkJobStatus(String appId, String webURL, int + jobId, int[] stageIds) throws HiveException { + RemoteSparkJobStatus remoteJobStatus = mock(RemoteSparkJobStatus.class); + when(remoteJobStatus.isRemoteActive()).thenReturn(true); + when(remoteJobStatus.getRemoteJobState()).thenReturn(JobHandle.State.SENT); + when(remoteJobStatus.getAppID()).thenReturn(appId); + when(remoteJobStatus.getWebUIURL()).thenReturn(webURL); + when(remoteJobStatus.getJobId()).thenReturn(jobId); + when(remoteJobStatus.getStageIds()).thenReturn(stageIds); + when(remoteJobStatus.getSparkStageProgress()).thenReturn(ImmutableMap.of( + "1_0", new SparkStageProgress(10, 10, 0, 0), + "2_0", new SparkStageProgress(10, 10, 0, 0), + "3_0", new SparkStageProgress(15, 15, 0, 0))); + return remoteJobStatus; + } +}