diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 2ee8c93..03720db 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -120,7 +120,6 @@ public int execute(DriverContext driverContext) { addToHistory(jobRef); sparkJobID = jobRef.getJobId(); - this.jobID = jobRef.getSparkJobStatus().getAppID(); rc = jobRef.monitorJob(); SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); getSparkJobInfo(sparkJobStatus, rc); @@ -142,10 +141,6 @@ public int execute(DriverContext driverContext) { ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID ); killJob(); } - - if (this.jobID == null) { - this.jobID = sparkJobStatus.getAppID(); - } sparkJobStatus.cleanup(); } catch (Exception e) { String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'"; @@ -246,6 +241,14 @@ public String getName() { } @Override + public String getJobID() { + if (jobID == null && jobRef != null) { + jobID = jobRef.getSparkJobStatus().getAppID(); + } + return jobID; + } + + @Override public Collection getMapWork() { List result = Lists.newArrayList(); for (BaseWork w : getWork().getRoots()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 37b8363..7e012d7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -73,7 +73,8 @@ public int startMonitor() { console.printError("Job hasn't been submitted after " + timeCount + "s." + " Aborting it.\nPossible reasons include network issues, " + "errors in remote driver or the cluster has no available resources, etc.\n" + - "Please check YARN or Spark driver's logs for further information."); + "Please check YARN or Spark driver's logs for further information.\n" + + "The timeout is controlled by " + HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT + "."); console.printError("Status: " + state); running = false; done = true; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 67db303..e1d5b55 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -20,6 +20,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -42,6 +43,7 @@ import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Used with remove spark client. @@ -62,11 +64,17 @@ public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle job @Override public String getAppID() { - Future getAppID = sparkClient.run(new GetAppIDJob()); try { + Future getAppID = sparkClient.run(new GetAppIDJob()); return getAppID.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); } catch (Exception e) { - LOG.warn("Failed to get APP ID.", e); + String errorMessage = "Failed to get Spark application ID"; + if (e instanceof TimeoutException) { + errorMessage += " after " + sparkClientTimeoutInSeconds + + "s. The timeout is controlled by " + HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT; + } + errorMessage += "."; + LOG.warn(errorMessage, e); if (Thread.interrupted()) { error = e; } @@ -168,9 +176,9 @@ private SparkJobInfo getSparkJobInfo() throws HiveException { if (sparkJobId == null) { return null; } - Future getJobInfo = sparkClient.run( - new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId)); try { + Future getJobInfo = sparkClient.run( + new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId)); return getJobInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); } catch (Exception e) { LOG.warn("Failed to get job info.", e); @@ -179,8 +187,8 @@ private SparkJobInfo getSparkJobInfo() throws HiveException { } private SparkStageInfo getSparkStageInfo(int stageId) { - Future getStageInfo = sparkClient.run(new GetStageInfoJob(stageId)); try { + Future getStageInfo = sparkClient.run(new GetStageInfoJob(stageId)); return getStageInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); } catch (Throwable t) { LOG.warn("Error getting stage info", t);