diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d3ea824..9b242e6 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3337,6 +3337,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.spark.use.groupby.shuffle", true, "Spark groupByKey transformation has better performance but uses unbounded memory." + "Turn this off when there is a memory issue."), + SPARK_JOB_MAX_TASKS("hive.spark.job.max.tasks", -1, "The maximum number of tasks a Spark job may have.\n" + + "If a Spark job contains more tasks than the maximum, it will be cancelled. A value of -1 means no limit."), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 32a7730..34ee349 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -129,6 +129,11 @@ public int execute(DriverContext driverContext) { // TODO: If the timeout is because of lack of resources in the cluster, we should // ideally also cancel the app request here. But w/o facilities from Spark or YARN, // it's difficult to do it on hive side alone. See HIVE-12650. + LOG.info("Failed to submit Spark job " + sparkJobID); + jobRef.cancelJob(); + } else if (rc == 4) { + LOG.info("The number of tasks reaches above the limit. Killing YARN application " + jobID + + " for Spark job " + sparkJobID); jobRef.cancelJob(); } if (this.jobID == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index dd73f3e..b8b7fe9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -34,7 +34,7 @@ * It print current job status to console and sleep current thread between monitor interval. */ public class RemoteSparkJobMonitor extends SparkJobMonitor { - + private int sparkJobMaxTaskCount = -1; private RemoteSparkJobStatus sparkJobStatus; private final HiveConf hiveConf; @@ -42,6 +42,7 @@ public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobSta super(hiveConf); this.sparkJobStatus = sparkJobStatus; this.hiveConf = hiveConf; + sparkJobMaxTaskCount = hiveConf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS); } @Override @@ -100,6 +101,15 @@ public int startMonitor() { } else { console.logInfo(format); } + } else { + // Count the number of tasks, and kill application if it goes beyond the limit. + int totalTaskCount = getTotalTaskCount(progressMap); + if (sparkJobMaxTaskCount != -1 && totalTaskCount > sparkJobMaxTaskCount) { + rc = 4; + done = true; + console.printInfo("\nThe total number of task in the Spark job [" + totalTaskCount + "] is greater than the limit [" + + sparkJobMaxTaskCount + "]. The Spark job will be cancelled."); + } } printStatus(progressMap, lastProgressMap); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 0b224f2..f7985dd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -173,6 +173,14 @@ protected void printStatus(Map progressMap, lastPrintTime = System.currentTimeMillis(); } + protected int getTotalTaskCount(Map progressMap) { + int total = 0; + for (SparkStageProgress progress: progressMap.values() ) { + total += progress.getTotalTaskCount(); + } + return total; + } + private String getReport(Map progressMap) { StringBuilder reportBuffer = new StringBuilder(); SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");