diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 84398c6..99c26ce 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3350,6 +3350,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.spark.use.groupby.shuffle", true, "Spark groupByKey transformation has better performance but uses unbounded memory." + "Turn this off when there is a memory issue."), + SPARK_JOB_MAX_TASKS("hive.spark.job.max.tasks", -1, "The maximum number of tasks a Spark job may have.\n" + + "If a Spark job contains more tasks than the maximum, it will be cancelled. A value of -1 means no limit."), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 32a7730..98b1605 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -129,8 +129,14 @@ public int execute(DriverContext driverContext) { // TODO: If the timeout is because of lack of resources in the cluster, we should // ideally also cancel the app request here. But w/o facilities from Spark or YARN, // it's difficult to do it on hive side alone. See HIVE-12650. + LOG.info("Failed to submit Spark job " + sparkJobID); + jobRef.cancelJob(); + } else if (rc == 4) { + LOG.info("The number of tasks reaches above the limit " + conf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS) + + ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID ); jobRef.cancelJob(); } + if (this.jobID == null) { this.jobID = sparkJobStatus.getAppID(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index dd73f3e..2b25f92 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -34,7 +34,7 @@ * It print current job status to console and sleep current thread between monitor interval. */ public class RemoteSparkJobMonitor extends SparkJobMonitor { - + private int sparkJobMaxTaskCount = -1; private RemoteSparkJobStatus sparkJobStatus; private final HiveConf hiveConf; @@ -42,6 +42,7 @@ public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobSta super(hiveConf); this.sparkJobStatus = sparkJobStatus; this.hiveConf = hiveConf; + sparkJobMaxTaskCount = hiveConf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS); } @Override @@ -100,6 +101,17 @@ public int startMonitor() { } else { console.logInfo(format); } + } else { + // Count the number of tasks, and kill application if it goes beyond the limit. + if (sparkJobMaxTaskCount != -1) { + int totalTaskCount = getTotalTaskCount(progressMap); + if (totalTaskCount > sparkJobMaxTaskCount) { + rc = 4; + done = true; + console.printInfo("\nThe total number of task in the Spark job [" + totalTaskCount + "] is greater than the limit [" + + sparkJobMaxTaskCount + "]. The Spark job will be cancelled."); + } + } } printStatus(progressMap, lastProgressMap); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 0b224f2..d707303 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -65,7 +65,8 @@ protected final boolean inPlaceUpdate; private int lines = 0; private final PrintStream out; - + // Total number of tasks in the Spark job + private int totalTasks = 0; private static final int COLUMN_1_WIDTH = 16; private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s "; @@ -93,8 +94,7 @@ private void printStatusInPlace(Map progressMap) { StringBuilder reportBuffer = new StringBuilder(); - // Num of total and completed tasks - int sumTotal = 0; + // Number of completed tasks int sumComplete = 0; // position the cursor to line 0 @@ -114,7 +114,7 @@ private void printStatusInPlace(Map progressMap) { final int total = progress.getTotalTaskCount(); final int running = progress.getRunningTaskCount(); final int failed = progress.getFailedTaskCount(); - sumTotal += total; + totalTasks += total; sumComplete += complete; StageState state = total > 0 ? StageState.PENDING : StageState.FINISHED; @@ -146,7 +146,7 @@ private void printStatusInPlace(Map progressMap) { } reprintMultiLine(reportBuffer.toString()); reprintLine(SEPARATOR); - final float progress = (sumTotal == 0) ? 1.0f : (float) sumComplete / (float) sumTotal; + final float progress = (totalTasks == 0) ? 1.0f : (float) sumComplete / (float) totalTasks; String footer = getFooter(numKey, completed.size(), progress, startTime); reprintLineWithColorAsBold(footer, Ansi.Color.RED); reprintLine(SEPARATOR); @@ -173,6 +173,18 @@ protected void printStatus(Map progressMap, lastPrintTime = System.currentTimeMillis(); } + protected int getTotalTaskCount(Map progressMap) { + if (totalTasks != 0) { + return totalTasks; + } + + for (SparkStageProgress progress: progressMap.values() ) { + totalTasks += progress.getTotalTaskCount(); + } + + return totalTasks; + } + private String getReport(Map progressMap) { StringBuilder reportBuffer = new StringBuilder(); SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");