diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 055b2cd..5f1fae6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -44,15 +44,16 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * Used with remove spark client. */ public class RemoteSparkJobStatus implements SparkJobStatus { private static final Log LOG = LogFactory.getLog(RemoteSparkJobStatus.class.getName()); - // time (in seconds) to wait for a spark job to be submitted on remote cluster - // after this period, we decide the job submission has failed so that client won't hang forever - private static final int WAIT_SUBMISSION_TIMEOUT = 30; + // time (in milliseconds) to wait for a job to start on remote driver, this is to prevent + // the client from hanging in case of failures outside job execution on remote side + private static final long WAIT_START_TIMEOUT = 30000; // remember when the monitor starts private final long startTime; private final SparkClient sparkClient; @@ -61,7 +62,7 @@ public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle jobHandle) { this.sparkClient = sparkClient; this.jobHandle = jobHandle; - startTime = System.currentTimeMillis(); + startTime = System.nanoTime(); } @Override @@ -72,7 +73,8 @@ public int getJobId() { @Override public JobExecutionStatus getState() { SparkJobInfo sparkJobInfo = getSparkJobInfo(); - return sparkJobInfo != null ? sparkJobInfo.status() : JobExecutionStatus.UNKNOWN; + // TODO: make sure JobHandle is completed if status is SUCCEEDED + return sparkJobInfo != null ? sparkJobInfo.status() : null; } @Override @@ -107,15 +109,15 @@ public SparkCounters getCounter() { @Override public SparkStatistics getSparkStatistics() { + MetricsCollection metricsCollection = jobHandle.getMetrics(); + if (metricsCollection == null || getCounter() == null) { + return null; + } SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); // add Hive operator level statistics. sparkStatisticsBuilder.add(getCounter()); // add spark job metrics. String jobIdentifier = "Spark Job[" + jobHandle.getClientJobId() + "] Metrics"; - MetricsCollection metricsCollection = jobHandle.getMetrics(); - if (metricsCollection == null) { - return null; - } Map flatJobMetric = extractMetrics(metricsCollection); for (Map.Entry entry : flatJobMetric.entrySet()) { @@ -134,28 +136,16 @@ private SparkJobInfo getSparkJobInfo() { Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : null; if (sparkJobId == null) { - int duration = (int) ((System.currentTimeMillis() - startTime) / 1000); - if (duration <= WAIT_SUBMISSION_TIMEOUT) { - return null; - } else { - LOG.info("Job hasn't been submitted after " + duration + "s. Aborting it."); + if (jobHandle.isDone()) { + return getDefaultJobInfo(jobHandle.isSuccessful() ? + JobExecutionStatus.SUCCEEDED : JobExecutionStatus.FAILED); + } + long duration = TimeUnit.MILLISECONDS.convert( + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + if (!jobHandle.hasStarted() && duration > WAIT_START_TIMEOUT) { + LOG.info("Job hasn't started after " + duration / 1000 + "s. Aborting it."); jobHandle.cancel(false); - return new SparkJobInfo() { - @Override - public int jobId() { - return -1; - } - - @Override - public int[] stageIds() { - return new int[0]; - } - - @Override - public JobExecutionStatus status() { - return JobExecutionStatus.FAILED; - } - }; + return getDefaultJobInfo(JobExecutionStatus.FAILED); } } JobHandle getJobInfo = sparkClient.submit( @@ -180,57 +170,32 @@ private SparkStageInfo getSparkStageInfo(int stageId) { private static class GetJobInfoJob implements Job { private final String clientJobId; - private final int sparkJobId; + private final Integer sparkJobId; - GetJobInfoJob(String clientJobId, int sparkJobId) { + GetJobInfoJob(String clientJobId, Integer sparkJobId) { this.clientJobId = clientJobId; this.sparkJobId = sparkJobId; } @Override public HiveSparkJobInfo call(JobContext jc) throws Exception { - SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId); + SparkJobInfo jobInfo = null; + if (sparkJobId != null) { + jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId); + } + if (jobInfo == null) { List> list = jc.getMonitoredJobs().get(clientJobId); if (list != null && list.size() == 1) { JavaFutureAction futureAction = list.get(0); if (futureAction.isDone()) { - jobInfo = new SparkJobInfo() { - @Override - public int jobId() { - return sparkJobId; - } - - @Override - public int[] stageIds() { - return new int[0]; - } - - @Override - public JobExecutionStatus status() { - return JobExecutionStatus.SUCCEEDED; - } - }; + jobInfo = getDefaultJobInfo(JobExecutionStatus.SUCCEEDED); } } } - if (jobInfo == null) { - jobInfo = new SparkJobInfo() { - @Override - public int jobId() { - return -1; - } - @Override - public int[] stageIds() { - return new int[0]; - } - - @Override - public JobExecutionStatus status() { - return JobExecutionStatus.UNKNOWN; - } - }; + if (jobInfo == null) { + jobInfo = getDefaultJobInfo(JobExecutionStatus.UNKNOWN); } return new HiveSparkJobInfo(jobInfo); } @@ -281,4 +246,23 @@ public HiveSparkStageInfo call(JobContext jc) throws Exception { return results; } + + private static SparkJobInfo getDefaultJobInfo(final JobExecutionStatus status) { + return new SparkJobInfo() { + @Override + public int jobId() { + return -1; + } + + @Override + public int[] stageIds() { + return new int[0]; + } + + @Override + public JobExecutionStatus status() { + return status; + } + }; + } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java index fd5daf4..a4c5d4b 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java @@ -54,6 +54,16 @@ */ SparkCounters getSparkCounters(); + /** + * Whether the job has successfully finished + */ + boolean isSuccessful(); + + /** + * Whether the job has started on remote driver + */ + boolean hasStarted(); + // TODO: expose job status? } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java index 806e1ea..6188eb7 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java @@ -43,7 +43,8 @@ private Throwable error; private final List sparkJobIds; - private SparkCounters sparkCounters; + private volatile SparkCounters sparkCounters; + private volatile boolean started; JobHandleImpl(SparkClientImpl client, String jobId) { this.client = client; @@ -54,6 +55,7 @@ this.completed = false; this.sparkJobIds = new CopyOnWriteArrayList(); sparkCounters = null; + started = false; } /** Requests a running job to be cancelled. */ @@ -122,6 +124,16 @@ public SparkCounters getSparkCounters() { return sparkCounters; } + @Override + public boolean isSuccessful() { + return isDone() && result != null && error == null; + } + + @Override + public boolean hasStarted() { + return started; + } + private T get(long timeout) throws ExecutionException, InterruptedException, TimeoutException { long deadline = System.currentTimeMillis() + timeout; synchronized (monitor) { @@ -162,4 +174,8 @@ void complete(Object result, Throwable error) { public void setSparkCounters(SparkCounters sparkCounters) { this.sparkCounters = sparkCounters; } + + public void setStarted(boolean started) { + this.started = started; + } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java index 68295f3..5c242a4 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java +++ spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java @@ -131,18 +131,48 @@ /** * Inform the client that a new spark job has been submitted for the client job */ - static class JobSubmitted implements Serializable { + static class SparkJobSubmitted implements Serializable { final String clientJobId; final int sparkJobId; - JobSubmitted(String clientJobId, int sparkJobId) { + SparkJobSubmitted(String clientJobId, int sparkJobId) { this.clientJobId = clientJobId; this.sparkJobId = sparkJobId; } - JobSubmitted() { + SparkJobSubmitted() { this(null, -1); } } + /** + * To acknowledge that a job request has been received + */ + static class JobReceived implements Serializable { + final String clientJobId; + + JobReceived(String clientJobId) { + this.clientJobId = clientJobId; + } + + JobReceived() { + this(null); + } + } + + /** + * Inform the client that the job has started on remote driver + */ + static class JobStarted implements Serializable { + final String clientJobId; + + JobStarted(String clientJobId) { + this.clientJobId = clientJobId; + } + + JobStarted() { + this(null); + } + } + } diff --git spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index cbe06d8..f0879d3 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -184,6 +184,7 @@ public void onReceive(Object message) throws Exception { } else if (message instanceof Protocol.JobRequest) { Protocol.JobRequest req = (Protocol.JobRequest) message; LOG.info("Received job request {}", req.id); + client.tell(new Protocol.JobReceived(req.id), actor); JobWrapper wrapper = new JobWrapper(req); activeJobs.put(req.id, wrapper); wrapper.submit(); @@ -211,6 +212,7 @@ public void onReceive(Object message) throws Exception { @Override public Void call() throws Exception { try { + client.tell(new Protocol.JobStarted(req.id), actor); jc.setMonitorCb(new MonitorCallback() { @Override public void call(JavaFutureAction future, SparkCounters sparkCounters) { @@ -261,7 +263,7 @@ private void monitorJob(JavaFutureAction job, SparkCounters sparkCounters) { } jc.getMonitoredJobs().get(req.id).add(job); this.sparkCounters = sparkCounters; - client.tell(new Protocol.JobSubmitted(req.id, job.jobIds().get(0)), actor); + client.tell(new Protocol.SparkJobSubmitted(req.id, job.jobIds().get(0)), actor); } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index b579e84..9ba475e 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Serializable; import java.io.Writer; @@ -345,16 +344,28 @@ public void onReceive(Object message) throws Exception { } else { LOG.warn("Received result for unknown job {}", jr.id); } - } else if (message instanceof Protocol.JobSubmitted) { - Protocol.JobSubmitted jobSubmitted = (Protocol.JobSubmitted) message; - JobHandleImpl handle = jobs.get(jobSubmitted.clientJobId); + } else if (message instanceof Protocol.SparkJobSubmitted) { + Protocol.SparkJobSubmitted sparkJobSubmitted = (Protocol.SparkJobSubmitted) message; + JobHandleImpl handle = jobs.get(sparkJobSubmitted.clientJobId); if (handle != null) { LOG.info("Received spark job ID: {} for {}", - jobSubmitted.sparkJobId, jobSubmitted.clientJobId); - handle.getSparkJobIds().add(jobSubmitted.sparkJobId); + sparkJobSubmitted.sparkJobId, sparkJobSubmitted.clientJobId); + handle.getSparkJobIds().add(sparkJobSubmitted.sparkJobId); } else { LOG.warn("Received spark job ID: {} for unknown job {}", - jobSubmitted.sparkJobId, jobSubmitted.clientJobId); + sparkJobSubmitted.sparkJobId, sparkJobSubmitted.clientJobId); + } + } else if (message instanceof Protocol.JobReceived) { + Protocol.JobReceived jobReceived = (Protocol.JobReceived) message; + LOG.info("Job " + jobReceived.clientJobId + " has been received by remote driver."); + } else if (message instanceof Protocol.JobStarted) { + Protocol.JobStarted jobStarted = (Protocol.JobStarted) message; + JobHandleImpl handle = jobs.get(jobStarted.clientJobId); + if (handle != null) { + LOG.info("Job " + jobStarted.clientJobId + " has started on remote driver."); + handle.setStarted(true); + } else { + LOG.warn("Received JobStarted message for unknown job {}", jobStarted.clientJobId); } } }