diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 8c075b8..362072f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -89,7 +89,7 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverC @Override public int execute(DriverContext driverContext) { - int rc = 1; + int rc = 0; SparkSession sparkSession = null; SparkSessionManager sparkSessionManager = null; try { @@ -104,7 +104,7 @@ public int execute(DriverContext driverContext) { SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); if (sparkJobStatus != null) { SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus); - monitor.startMonitor(); + rc = monitor.startMonitor(); // for RSC, we should get the counters after job has finished sparkCounters = sparkJobStatus.getCounter(); SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); @@ -114,10 +114,14 @@ public int execute(DriverContext driverContext) { } sparkJobStatus.cleanup(); } - rc = 0; } catch (Exception e) { - LOG.error("Failed to execute spark task.", e); - return 1; + String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'"; + + // Has to use full name to make sure it does not conflict with + // org.apache.commons.lang.StringUtils + console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); + LOG.error(msg, e); + rc = 1; } finally { if (sparkSession != null && sparkSessionManager != null) { rc = close(rc); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index c075c35..90a2f9e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.spark.JobExecutionStatus; @@ -40,8 +41,7 @@ private static final Log LOG = LogFactory.getLog(CLASS_NAME); private transient LogHelper console; - private final int checkInterval = 200; - private final int maxRetryInterval = 2500; + private final int checkInterval = 1000; private final int printInterval = 3000; private long lastPrintTime; private Set completed; @@ -58,15 +58,14 @@ public int startMonitor() { boolean running = false; boolean done = false; - int failedCounter = 0; int rc = 0; JobExecutionStatus lastState = null; Map lastProgressMap = null; long startTime = -1; while (true) { + JobExecutionStatus state = sparkJobStatus.getState(); try { - JobExecutionStatus state = sparkJobStatus.getState(); if (LOG.isDebugEnabled()) { console.printInfo("state = " + state); } @@ -123,16 +122,18 @@ public int startMonitor() { Thread.sleep(checkInterval); } } catch (Exception e) { - console.printInfo("Exception: " + e.getMessage()); - if (++failedCounter % maxRetryInterval / checkInterval == 0 - || e instanceof InterruptedException) { - console.printInfo("Killing Job..."); - console.printError("Execution has failed."); - rc = 1; - done = true; + String msg = " with exception '" + Utilities.getNameMessage(e) + "'"; + if (state == null || state.equals(JobExecutionStatus.UNKNOWN)) { + msg = "Job Submission failed" + msg; } else { - console.printInfo("Retrying..."); + msg = "Ended Job = " + sparkJobStatus.getJobId() + msg; } + + // Has to use full name to make sure it does not conflict with + // org.apache.commons.lang.StringUtils + LOG.error(msg, e); + console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); + rc = 1; } finally { if (done) { break; diff --git ql/src/test/results/clientpositive/spark/bucket_map_join_spark4.q.out ql/src/test/results/clientpositive/spark/bucket_map_join_spark4.q.out index f32b422..9794b21 100644 --- ql/src/test/results/clientpositive/spark/bucket_map_join_spark4.q.out +++ ql/src/test/results/clientpositive/spark/bucket_map_join_spark4.q.out @@ -441,7 +441,6 @@ PREHOOK: Input: default@tbl1 PREHOOK: Input: default@tbl2 PREHOOK: Input: default@tbl3 #### A masked pattern was here #### -Status: Failed POSTHOOK: query: select a.key as key, a.value as val1, b.value as val2, c.value as val3 from tbl1 a join tbl2 b on a.key = b.key join tbl3 c on a.value = c.value POSTHOOK: type: QUERY