diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 6217de4..bc5dd01 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -44,15 +44,16 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * Used with remove spark client. */ public class RemoteSparkJobStatus implements SparkJobStatus { private static final Log LOG = LogFactory.getLog(RemoteSparkJobStatus.class.getName()); - // time (in seconds) to wait for a spark job to be submitted on remote cluster + // time (in milliseconds) to wait for a spark job to be submitted on remote cluster // after this period, we decide the job submission has failed so that client won't hang forever - private static final int WAIT_SUBMISSION_TIMEOUT = 30; + private static final long WAIT_SUBMISSION_TIMEOUT = 30000; // remember when the monitor starts private final long startTime; private final SparkClient sparkClient; @@ -61,7 +62,7 @@ public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle jobHandle) { this.sparkClient = sparkClient; this.jobHandle = jobHandle; - startTime = System.currentTimeMillis(); + startTime = System.nanoTime(); } @Override @@ -72,7 +73,7 @@ public int getJobId() { @Override public JobExecutionStatus getState() { SparkJobInfo sparkJobInfo = getSparkJobInfo(); - return sparkJobInfo != null ? sparkJobInfo.status() : JobExecutionStatus.UNKNOWN; + return sparkJobInfo != null ? sparkJobInfo.status() : null; } @Override @@ -134,28 +135,14 @@ private SparkJobInfo getSparkJobInfo() { Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : null; if (sparkJobId == null) { - int duration = (int) ((System.currentTimeMillis() - startTime) / 1000); + long duration = TimeUnit.MILLISECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); if (duration <= WAIT_SUBMISSION_TIMEOUT) { return null; } else { LOG.info("Job hasn't been submitted after " + duration + "s. Aborting it."); jobHandle.cancel(false); - return new SparkJobInfo() { - @Override - public int jobId() { - return -1; - } - - @Override - public int[] stageIds() { - return new int[0]; - } - - @Override - public JobExecutionStatus status() { - return JobExecutionStatus.FAILED; - } - }; + return getDefaultJobInfo(sparkJobId, JobExecutionStatus.FAILED); } } JobHandle getJobInfo = sparkClient.submit( @@ -200,42 +187,12 @@ public HiveSparkJobInfo call(JobContext jc) throws Exception { if (list != null && list.size() == 1) { JavaFutureAction futureAction = list.get(0); if (futureAction.isDone()) { - jobInfo = new SparkJobInfo() { - @Override - public int jobId() { - return sparkJobId; - } - - @Override - public int[] stageIds() { - return new int[0]; - } - - @Override - public JobExecutionStatus status() { - return JobExecutionStatus.SUCCEEDED; - } - }; + jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.SUCCEEDED); } } } if (jobInfo == null) { - jobInfo = new SparkJobInfo() { - @Override - public int jobId() { - return -1; - } - - @Override - public int[] stageIds() { - return new int[0]; - } - - @Override - public JobExecutionStatus status() { - return JobExecutionStatus.UNKNOWN; - } - }; + jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.UNKNOWN); } return new HiveSparkJobInfo(jobInfo); } @@ -291,4 +248,25 @@ public HiveSparkStageInfo call(JobContext jc) throws Exception { return results; } + + private static SparkJobInfo getDefaultJobInfo(final Integer jobId, + final JobExecutionStatus status) { + return new SparkJobInfo() { + + @Override + public int jobId() { + return jobId == null ? -1 : jobId; + } + + @Override + public int[] stageIds() { + return new int[0]; + } + + @Override + public JobExecutionStatus status() { + return status; + } + }; + } }