diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 30e53d2..3a1577f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -110,7 +111,12 @@ private void createRemoteClient() throws Exception { int curExecutors = 0; long ts = System.currentTimeMillis(); do { - curExecutors = getExecutorCount(); + try { + curExecutors = getExecutorCount(MAX_PREWARM_TIME, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // let's don't fail on future timeout since we have a timeout for pre-warm + LOG.warn("Timed out getting executor count.", e); + } if (curExecutors >= minExecutors) { LOG.info("Finished prewarming Spark executors. The current number of executors is " + curExecutors); return; @@ -118,8 +124,8 @@ private void createRemoteClient() throws Exception { Thread.sleep(500); // sleep half a second } while (System.currentTimeMillis() - ts < MAX_PREWARM_TIME); - LOG.info("Timeout (" + MAX_PREWARM_TIME + - "s) occurred while prewarming executors. The current number of executors is " + curExecutors); + LOG.info("Timeout (" + MAX_PREWARM_TIME / 1000 + "s) occurred while prewarming executors. " + + "The current number of executors is " + curExecutors); } } @@ -143,6 +149,11 @@ private int getExecutorsToWarm() { return minExecutors; } + private int getExecutorCount(long timeout, TimeUnit unit) throws Exception { + Future handler = remoteClient.getExecutorCount(); + return handler.get(timeout, unit); + } + @Override public SparkConf getSparkConf() { return sparkConf; @@ -150,8 +161,7 @@ public SparkConf getSparkConf() { @Override public int getExecutorCount() throws Exception { - Future handler = remoteClient.getExecutorCount(); - return handler.get(sparkClientTimtout, TimeUnit.SECONDS).intValue(); + return getExecutorCount(sparkClientTimtout, TimeUnit.SECONDS); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 26cce1b..7f87adf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -105,6 +105,9 @@ public int execute(DriverContext driverContext) { } LOG.info("Execution completed successfully"); } else if (rc == 2) { // Cancel job if the monitor found job submission timeout. + // TODO: If the timeout is because of lack of resources in the cluster, we should + // ideally also cancel the app request here. But w/o facilities from Spark or YARN, + // it's difficult to do it on hive side alone. See HIVE-12650. jobRef.cancelJob(); } sparkJobStatus.cleanup(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java index d109c6f..5f0352a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java @@ -59,7 +59,7 @@ public int startMonitor() { if (state == null) { long timeCount = (System.currentTimeMillis() - startTime)/1000; if (timeCount > monitorTimeoutInteval) { - LOG.info("Job hasn't been submitted after " + timeCount + "s. Aborting it."); + console.printError("Job hasn't been submitted after " + timeCount + "s. Aborting it."); console.printError("Status: " + state); running = false; done = true; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index 6990e80..11f263b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -66,7 +66,10 @@ public int startMonitor() { case QUEUED: long timeCount = (System.currentTimeMillis() - startTime) / 1000; if ((timeCount > monitorTimeoutInteval)) { - LOG.info("Job hasn't been submitted after " + timeCount + "s. Aborting it."); + console.printError("Job hasn't been submitted after " + timeCount + "s." + + " Aborting it.\nPossible reasons include network issues, " + + "errors in remote driver or the cluster has no available resources, etc.\n" + + "Please check YARN or Spark driver's logs for further information."); console.printError("Status: " + state); running = false; done = true;