diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index f5b1e48..c20dead 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -323,7 +323,7 @@ public void call(JavaFutureAction future, private final BaseProtocol.JobRequest req; private final List> jobs; - private final AtomicInteger completed; + private int completed; private SparkCounters sparkCounters; private Set cachedRDDIds; @@ -332,7 +332,7 @@ public void call(JavaFutureAction future, JobWrapper(BaseProtocol.JobRequest req) { this.req = req; this.jobs = Lists.newArrayList(); - this.completed = new AtomicInteger(); + completed = 0; this.sparkCounters = null; this.cachedRDDIds = null; } @@ -351,23 +351,19 @@ public void call(JavaFutureAction future, }); T result = req.job.call(jc); - synchronized (completed) { - while (completed.get() < jobs.size()) { - LOG.debug("Client job {} finished, {} of {} Spark jobs finished.", - req.id, completed.get(), jobs.size()); - completed.wait(); - } + // In case the job is empty, there won't be JobStart/JobEnd events. The only way + // to know if the job has finished is to check the futures here ourselves. + for (JavaFutureAction future : jobs) { + future.get(); + completed++; + LOG.debug("Client job {}: {} of {} Spark jobs finished.", + req.id, completed, jobs.size()); } SparkCounters counters = null; if (sparkCounters != null) { counters = sparkCounters.snapshot(); } - // make sure job has really succeeded - // at this point, future.get shall not block us - for (JavaFutureAction future : jobs) { - future.get(); - } protocol.jobFinished(req.id, result, null, counters); } catch (Throwable t) { // Catch throwables in a best-effort to report job status back to the client. It's @@ -389,13 +385,6 @@ void submit() { this.future = executor.submit(this); } - void jobDone() { - synchronized (completed) { - completed.incrementAndGet(); - completed.notifyAll(); - } - } - /** * Release cached RDDs as soon as the job is done. * This is different from local Spark client so as @@ -449,11 +438,6 @@ public void onJobEnd(SparkListenerJobEnd jobEnd) { } } } - - String clientId = getClientId(jobEnd.jobId()); - if (clientId != null) { - activeJobs.get(clientId).jobDone(); - } } @Override