diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 055b2cd..c98e54b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -44,15 +44,16 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * Used with remove spark client. */ public class RemoteSparkJobStatus implements SparkJobStatus { private static final Log LOG = LogFactory.getLog(RemoteSparkJobStatus.class.getName()); - // time (in seconds) to wait for a spark job to be submitted on remote cluster - // after this period, we decide the job submission has failed so that client won't hang forever - private static final int WAIT_SUBMISSION_TIMEOUT = 30; + // time (in milliseconds) to wait for a job to start on remote driver, this is to prevent + // the client from hanging in case of failures outside job execution on remote side + private static final long WAIT_START_TIMEOUT = 30000; // remember when the monitor starts private final long startTime; private final SparkClient sparkClient; @@ -61,7 +62,7 @@ public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle jobHandle) { this.sparkClient = sparkClient; this.jobHandle = jobHandle; - startTime = System.currentTimeMillis(); + startTime = System.nanoTime(); } @Override @@ -72,7 +73,8 @@ public int getJobId() { @Override public JobExecutionStatus getState() { SparkJobInfo sparkJobInfo = getSparkJobInfo(); - return sparkJobInfo != null ? sparkJobInfo.status() : JobExecutionStatus.UNKNOWN; + // TODO: make sure JobHandle is completed if status is SUCCEEDED + return sparkJobInfo != null ? sparkJobInfo.status() : null; } @Override @@ -107,15 +109,15 @@ public SparkCounters getCounter() { @Override public SparkStatistics getSparkStatistics() { + MetricsCollection metricsCollection = jobHandle.getMetrics(); + if (metricsCollection == null || getCounter() == null) { + return null; + } SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); // add Hive operator level statistics. sparkStatisticsBuilder.add(getCounter()); // add spark job metrics. String jobIdentifier = "Spark Job[" + jobHandle.getClientJobId() + "] Metrics"; - MetricsCollection metricsCollection = jobHandle.getMetrics(); - if (metricsCollection == null) { - return null; - } Map flatJobMetric = extractMetrics(metricsCollection); for (Map.Entry entry : flatJobMetric.entrySet()) { @@ -134,28 +136,12 @@ private SparkJobInfo getSparkJobInfo() { Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : null; if (sparkJobId == null) { - int duration = (int) ((System.currentTimeMillis() - startTime) / 1000); - if (duration <= WAIT_SUBMISSION_TIMEOUT) { - return null; - } else { - LOG.info("Job hasn't been submitted after " + duration + "s. Aborting it."); + long duration = TimeUnit.MILLISECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + if (!jobHandle.hasStarted() && duration > WAIT_START_TIMEOUT) { + LOG.info("Job hasn't started after " + duration / 1000 + "s. Aborting it."); jobHandle.cancel(false); - return new SparkJobInfo() { - @Override - public int jobId() { - return -1; - } - - @Override - public int[] stageIds() { - return new int[0]; - } - - @Override - public JobExecutionStatus status() { - return JobExecutionStatus.FAILED; - } - }; + return getDefaultJobInfo(JobExecutionStatus.FAILED); } } JobHandle getJobInfo = sparkClient.submit( @@ -180,57 +166,32 @@ private SparkStageInfo getSparkStageInfo(int stageId) { private static class GetJobInfoJob implements Job { private final String clientJobId; - private final int sparkJobId; + private final Integer sparkJobId; - GetJobInfoJob(String clientJobId, int sparkJobId) { + GetJobInfoJob(String clientJobId, Integer sparkJobId) { this.clientJobId = clientJobId; this.sparkJobId = sparkJobId; } @Override public HiveSparkJobInfo call(JobContext jc) throws Exception { - SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId); + SparkJobInfo jobInfo = null; + if (sparkJobId != null) { + jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId); + } + if (jobInfo == null) { List> list = jc.getMonitoredJobs().get(clientJobId); if (list != null && list.size() == 1) { JavaFutureAction futureAction = list.get(0); if (futureAction.isDone()) { - jobInfo = new SparkJobInfo() { - @Override - public int jobId() { - return sparkJobId; - } - - @Override - public int[] stageIds() { - return new int[0]; - } - - @Override - public JobExecutionStatus status() { - return JobExecutionStatus.SUCCEEDED; - } - }; + jobInfo = getDefaultJobInfo(JobExecutionStatus.SUCCEEDED); } } } - if (jobInfo == null) { - jobInfo = new SparkJobInfo() { - @Override - public int jobId() { - return -1; - } - @Override - public int[] stageIds() { - return new int[0]; - } - - @Override - public JobExecutionStatus status() { - return JobExecutionStatus.UNKNOWN; - } - }; + if (jobInfo == null) { + jobInfo = getDefaultJobInfo(JobExecutionStatus.UNKNOWN); } return new HiveSparkJobInfo(jobInfo); } @@ -281,4 +242,23 @@ public HiveSparkStageInfo call(JobContext jc) throws Exception { return results; } + + private static SparkJobInfo getDefaultJobInfo(final JobExecutionStatus status) { + return new SparkJobInfo() { + @Override + public int jobId() { + return -1; + } + + @Override + public int[] stageIds() { + return new int[0]; + } + + @Override + public JobExecutionStatus status() { + return status; + } + }; + } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java index fd5daf4..24e1b65 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java @@ -54,6 +54,11 @@ */ SparkCounters getSparkCounters(); + /** + * Whether the job has started on remote driver + */ + boolean hasStarted(); + // TODO: expose job status? } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java index 806e1ea..69851a7 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java @@ -37,13 +37,14 @@ private final MetricsCollection metrics; private final Object monitor; - private AtomicBoolean cancelled; - private boolean completed; - private T result; - private Throwable error; + private final AtomicBoolean cancelled; + private final AtomicBoolean completed; + private volatile T result; + private volatile Throwable error; private final List sparkJobIds; - private SparkCounters sparkCounters; + private volatile SparkCounters sparkCounters; + private final AtomicBoolean started; JobHandleImpl(SparkClientImpl client, String jobId) { this.client = client; @@ -51,9 +52,10 @@ this.monitor = new Object(); this.metrics = new MetricsCollection(); this.cancelled = new AtomicBoolean(); - this.completed = false; + this.completed = new AtomicBoolean(); this.sparkJobIds = new CopyOnWriteArrayList(); sparkCounters = null; + started = new AtomicBoolean(); } /** Requests a running job to be cancelled. */ @@ -89,7 +91,7 @@ public boolean isCancelled() { @Override public boolean isDone() { - return completed; + return completed.get(); } /** @@ -122,10 +124,15 @@ public SparkCounters getSparkCounters() { return sparkCounters; } + @Override + public boolean hasStarted() { + return started.get(); + } + private T get(long timeout) throws ExecutionException, InterruptedException, TimeoutException { long deadline = System.currentTimeMillis() + timeout; synchronized (monitor) { - while (!completed && !cancelled.get()) { + while (!completed.get() && !cancelled.get()) { if (timeout >= 0) { monitor.wait(timeout); } else { @@ -154,7 +161,7 @@ void complete(Object result, Throwable error) { synchronized (monitor) { this.result = (T) result; this.error = error; - this.completed = true; + this.completed.set(true); monitor.notifyAll(); } } @@ -162,4 +169,8 @@ void complete(Object result, Throwable error) { public void setSparkCounters(SparkCounters sparkCounters) { this.sparkCounters = sparkCounters; } + + public void setStarted(boolean started) { + this.started.set(started); + } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java index 68295f3..7edf4a9 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java +++ spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java @@ -131,18 +131,33 @@ /** * Inform the client that a new spark job has been submitted for the client job */ - static class JobSubmitted implements Serializable { + static class SparkJobSubmitted implements Serializable { final String clientJobId; final int sparkJobId; - JobSubmitted(String clientJobId, int sparkJobId) { + SparkJobSubmitted(String clientJobId, int sparkJobId) { this.clientJobId = clientJobId; this.sparkJobId = sparkJobId; } - JobSubmitted() { + SparkJobSubmitted() { this(null, -1); } } + /** + * Inform the client that the job has started on remote driver + */ + static class JobStarted implements Serializable { + final String clientJobId; + + JobStarted(String clientJobId) { + this.clientJobId = clientJobId; + } + + JobStarted() { + this(null); + } + } + } diff --git spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index cbe06d8..c1749a1 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -211,6 +211,7 @@ public void onReceive(Object message) throws Exception { @Override public Void call() throws Exception { try { + client.tell(new Protocol.JobStarted(req.id), actor); jc.setMonitorCb(new MonitorCallback() { @Override public void call(JavaFutureAction future, SparkCounters sparkCounters) { @@ -261,7 +262,7 @@ private void monitorJob(JavaFutureAction job, SparkCounters sparkCounters) { } jc.getMonitoredJobs().get(req.id).add(job); this.sparkCounters = sparkCounters; - client.tell(new Protocol.JobSubmitted(req.id, job.jobIds().get(0)), actor); + client.tell(new Protocol.SparkJobSubmitted(req.id, job.jobIds().get(0)), actor); } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index b579e84..8c875b3 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Serializable; import java.io.Writer; @@ -345,16 +344,25 @@ public void onReceive(Object message) throws Exception { } else { LOG.warn("Received result for unknown job {}", jr.id); } - } else if (message instanceof Protocol.JobSubmitted) { - Protocol.JobSubmitted jobSubmitted = (Protocol.JobSubmitted) message; - JobHandleImpl handle = jobs.get(jobSubmitted.clientJobId); + } else if (message instanceof Protocol.SparkJobSubmitted) { + Protocol.SparkJobSubmitted sparkJobSubmitted = (Protocol.SparkJobSubmitted) message; + JobHandleImpl handle = jobs.get(sparkJobSubmitted.clientJobId); if (handle != null) { LOG.info("Received spark job ID: {} for {}", - jobSubmitted.sparkJobId, jobSubmitted.clientJobId); - handle.getSparkJobIds().add(jobSubmitted.sparkJobId); + sparkJobSubmitted.sparkJobId, sparkJobSubmitted.clientJobId); + handle.getSparkJobIds().add(sparkJobSubmitted.sparkJobId); } else { LOG.warn("Received spark job ID: {} for unknown job {}", - jobSubmitted.sparkJobId, jobSubmitted.clientJobId); + sparkJobSubmitted.sparkJobId, sparkJobSubmitted.clientJobId); + } + } else if (message instanceof Protocol.JobStarted) { + Protocol.JobStarted jobStarted = (Protocol.JobStarted) message; + JobHandleImpl handle = jobs.get(jobStarted.clientJobId); + if (handle != null) { + LOG.info("Job " + jobStarted.clientJobId + " has started on remote driver."); + handle.setStarted(true); + } else { + LOG.warn("Received JobStarted message for unknown job {}", jobStarted.clientJobId); } } }