diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 176d36f..fce8db3 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3371,6 +3371,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Turn this off when there is a memory issue."), SPARK_JOB_MAX_TASKS("hive.spark.job.max.tasks", -1, "The maximum number of tasks a Spark job may have.\n" + "If a Spark job contains more tasks than the maximum, it will be cancelled. A value of -1 means no limit."), + SPARK_STAGE_MAX_TASKS("hive.spark.stage.max.tasks", -1, "The maximum number of tasks a stage in a Spark job may have.\n" + + "If a Spark job stage contains more tasks than the maximum, the job will be cancelled. A value of -1 means no limit."), NWAYJOINREORDER("hive.reorder.nway.joins", true, "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"), HIVE_MERGE_NWAY_JOINS("hive.merge.nway.joins", true, diff --git itests/src/test/resources/testconfiguration.properties itests/src/test/resources/testconfiguration.properties index e613374..62462bd 100644 --- itests/src/test/resources/testconfiguration.properties +++ itests/src/test/resources/testconfiguration.properties @@ -1459,5 +1459,6 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\ groupby3_map_skew_multi_distinct.q,\ groupby3_multi_distinct.q,\ groupby_grouping_sets7.q,\ - spark_job_max_tasks.q + spark_job_max_tasks.q,\ + spark_stage_max_tasks.q diff --git itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java index 1457db0..27b87fb 100644 --- itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java +++ itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CliConfigs.java @@ -288,6 +288,7 @@ public NegativeCliConfig() { excludesFrom(testConfigProps, "minimr.query.negative.files"); excludeQuery("authorization_uri_import.q"); excludeQuery("spark_job_max_tasks.q"); + excludeQuery("spark_stage_max_tasks.q"); setResultsDir("ql/src/test/results/clientnegative"); setLogDir("itests/qtest/target/qfile-results/clientnegative"); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index b4fb49f..2ee8c93 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -138,7 +138,7 @@ public int execute(DriverContext driverContext) { LOG.info("Failed to submit Spark job " + sparkJobID); killJob(); } else if (rc == 4) { - LOG.info("The number of tasks reaches above the limit " + conf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS) + + LOG.info("The spark job or one stage of it has too many tasks" + ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID ); killJob(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 9dfb65e..37b8363 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -35,7 +35,9 @@ */ public class RemoteSparkJobMonitor extends SparkJobMonitor { private int sparkJobMaxTaskCount = -1; + private int sparkStageMaxTaskCount = -1; private int totalTaskCount = 0; + private int stageMaxTaskCount = 0; private RemoteSparkJobStatus sparkJobStatus; private final HiveConf hiveConf; @@ -44,6 +46,7 @@ public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobSta this.sparkJobStatus = sparkJobStatus; this.hiveConf = hiveConf; sparkJobMaxTaskCount = hiveConf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS); + sparkStageMaxTaskCount = hiveConf.getIntVar(HiveConf.ConfVars.SPARK_STAGE_MAX_TASKS); } @Override @@ -103,6 +106,17 @@ public int startMonitor() { console.logInfo(format); } } else { + // Get the maximum of the number of tasks in the stages of the job and cancel the job if it goes beyond the limit. + if (sparkStageMaxTaskCount != -1 && stageMaxTaskCount == 0) { + stageMaxTaskCount = getStageMaxTaskCount(progressMap); + if (stageMaxTaskCount > sparkStageMaxTaskCount) { + rc = 4; + done = true; + console.printInfo("\nThe number of task in one stage of the Spark job [" + stageMaxTaskCount + "] is greater than the limit [" + + sparkStageMaxTaskCount + "]. The Spark job will be cancelled."); + } + } + // Count the number of tasks, and kill application if it goes beyond the limit. if (sparkJobMaxTaskCount != -1 && totalTaskCount == 0) { totalTaskCount = getTotalTaskCount(progressMap); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 41730b5..078a57d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -181,6 +181,18 @@ protected int getTotalTaskCount(Map progressMap) { return totalTasks; } + protected int getStageMaxTaskCount(Map progressMap) { + int stageMaxTasks = 0; + for (SparkStageProgress progress: progressMap.values() ) { + int tasks = progress.getTotalTaskCount(); + if (tasks > stageMaxTasks) { + stageMaxTasks = tasks; + } + } + + return stageMaxTasks; + } + private String getReport(Map progressMap) { StringBuilder reportBuffer = new StringBuilder(); SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); diff --git ql/src/test/queries/clientnegative/spark_stage_max_tasks.q ql/src/test/queries/clientnegative/spark_stage_max_tasks.q new file mode 100644 index 0000000..5bdb014 --- /dev/null +++ ql/src/test/queries/clientnegative/spark_stage_max_tasks.q @@ -0,0 +1,6 @@ +set hive.spark.stage.max.tasks=1; + +EXPLAIN +SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s; + +SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s; diff --git ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out new file mode 100644 index 0000000..ba2f09e --- /dev/null +++ ql/src/test/results/clientnegative/spark/spark_stage_max_tasks.q.out @@ -0,0 +1,77 @@ +PREHOOK: query: EXPLAIN +SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Spark + Edges: + Reducer 2 <- Map 1 (GROUP, 2) + Reducer 3 <- Reducer 2 (SORT, 1) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: src1 + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: key (type: string), value (type: string) + outputColumnNames: key, value + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: sum(value) + keys: key (type: string) + mode: hash + outputColumnNames: _col0, _col1 + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: string) + sort order: + + Map-reduce partition columns: _col0 (type: string) + Statistics: Num rows: 25 Data size: 191 Basic stats: COMPLETE Column stats: NONE + value expressions: _col1 (type: double) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: sum(VALUE._col0) + keys: KEY._col0 (type: string) + mode: mergepartial + outputColumnNames: _col0, _col1 + Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col1 (type: double) + sort order: + + Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: string) + Reducer 3 + Reduce Operator Tree: + Select Operator + expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: double) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 12 Data size: 91 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT key, sum(value) AS s FROM src1 GROUP BY key ORDER BY s +PREHOOK: type: QUERY +PREHOOK: Input: default@src1 +#### A masked pattern was here #### +FAILED: Execution Error, return code 4 from org.apache.hadoop.hive.ql.exec.spark.SparkTask