diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 84398c6..99c26ce 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3350,6 +3350,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.spark.use.groupby.shuffle", true, "Spark groupByKey transformation has better performance but uses unbounded memory." + "Turn this off when there is a memory issue."), + SPARK_JOB_MAX_TASKS("hive.spark.job.max.tasks", -1, "The maximum number of tasks a Spark job may have.\n" + + "If a Spark job contains more tasks than the maximum, it will be cancelled. A value of -1 means no limit."), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true, diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index 753f3a9..5ab3076 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -1445,4 +1445,6 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\ groupby2_multi_distinct.q,\ groupby3_map_skew_multi_distinct.q,\ groupby3_multi_distinct.q,\ - groupby_grouping_sets7.q + groupby_grouping_sets7.q,\ + spark_job_max_tasks.q + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 32a7730..98b1605 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -129,8 +129,14 @@ public int execute(DriverContext driverContext) { // TODO: If the timeout is because of lack of resources in the cluster, we should // ideally also cancel the app request here. But w/o facilities from Spark or YARN, // it's difficult to do it on hive side alone. See HIVE-12650. + LOG.info("Failed to submit Spark job " + sparkJobID); + jobRef.cancelJob(); + } else if (rc == 4) { + LOG.info("The number of tasks reaches above the limit " + conf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS) + + ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID ); jobRef.cancelJob(); } + if (this.jobID == null) { this.jobID = sparkJobStatus.getAppID(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index dd73f3e..9dfb65e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -34,7 +34,8 @@ * It print current job status to console and sleep current thread between monitor interval. */ public class RemoteSparkJobMonitor extends SparkJobMonitor { - + private int sparkJobMaxTaskCount = -1; + private int totalTaskCount = 0; private RemoteSparkJobStatus sparkJobStatus; private final HiveConf hiveConf; @@ -42,6 +43,7 @@ public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobSta super(hiveConf); this.sparkJobStatus = sparkJobStatus; this.hiveConf = hiveConf; + sparkJobMaxTaskCount = hiveConf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS); } @Override @@ -100,6 +102,17 @@ public int startMonitor() { } else { console.logInfo(format); } + } else { + // Count the number of tasks, and kill application if it goes beyond the limit. + if (sparkJobMaxTaskCount != -1 && totalTaskCount == 0) { + totalTaskCount = getTotalTaskCount(progressMap); + if (totalTaskCount > sparkJobMaxTaskCount) { + rc = 4; + done = true; + console.printInfo("\nThe total number of task in the Spark job [" + totalTaskCount + "] is greater than the limit [" + + sparkJobMaxTaskCount + "]. The Spark job will be cancelled."); + } + } } printStatus(progressMap, lastProgressMap); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 0b224f2..41730b5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -66,7 +66,6 @@ private int lines = 0; private final PrintStream out; - private static final int COLUMN_1_WIDTH = 16; private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s "; private static final String STAGE_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s "; @@ -173,6 +172,15 @@ protected void printStatus(Map progressMap, lastPrintTime = System.currentTimeMillis(); } + protected int getTotalTaskCount(Map progressMap) { + int totalTasks = 0; + for (SparkStageProgress progress: progressMap.values() ) { + totalTasks += progress.getTotalTaskCount(); + } + + return totalTasks; + } + private String getReport(Map progressMap) { StringBuilder reportBuffer = new StringBuilder(); SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); diff --git ql/src/test/queries/clientnegative/spark_job_max_tasks.q ql/src/test/queries/clientnegative/spark_job_max_tasks.q new file mode 100644 index 0000000..7473050 --- /dev/null +++ ql/src/test/queries/clientnegative/spark_job_max_tasks.q @@ -0,0 +1,6 @@ +set hive.spark.job.max.tasks=2; + +EXPLAIN +SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s; + +SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s; diff --git ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out new file mode 100644 index 0000000..ba2f09e --- /dev/null +++ ql/src/test/results/clientnegative/spark/spark_job_max_tasks.q.out @@ -0,0 +1,77 @@ +PREHOOK: query: EXPLAIN +SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP, 2) + Reducer 3 <- Reducer 2 (SORT, 1) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src1 + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: key, value + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: sum(value) + keys: key (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: double) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: double) + sort order: + + Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string) + Reducer 3 + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +PREHOOK: type: QUERY +PREHOOK: Input: default@src1 +#### A masked pattern was here #### +FAILED: Execution Error, return code 4 from org.apache.hadoop.hive.ql.exec.spark.SparkTask