diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 758fe92..1c64482 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -45,12 +45,18 @@ */ public class RemoteSparkJobStatus implements SparkJobStatus { private static final Log LOG = LogFactory.getLog(RemoteSparkJobStatus.class.getName()); + // time (in seconds) to wait for a spark job to be submitted on remote cluster + // after this period, we decide the job submission has failed so that client won't hang forever + private static final int WAIT_SUBMISSION_TIMEOUT = 30; + // remember when the monitor starts + private final long startTime; private final SparkClient sparkClient; private final JobHandle jobHandle; public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle jobHandle) { this.sparkClient = sparkClient; this.jobHandle = jobHandle; + startTime = System.currentTimeMillis(); } @Override @@ -108,7 +114,29 @@ private SparkJobInfo getSparkJobInfo() { Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : null; if (sparkJobId == null) { - return null; + int duration = (int) ((System.currentTimeMillis() - startTime) / 1000); + if (duration <= WAIT_SUBMISSION_TIMEOUT) { + return null; + } else { + LOG.info("Job hasn't been submitted after " + duration + "s. Aborting it."); + jobHandle.cancel(false); + return new SparkJobInfo() { + @Override + public int jobId() { + return -1; + } + + @Override + public int[] stageIds() { + return new int[0]; + } + + @Override + public JobExecutionStatus status() { + return JobExecutionStatus.FAILED; + } + }; + } } JobHandle getJobInfo = sparkClient.submit( new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId));