diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 22f052a..c16e4e4 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1964,7 +1964,12 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { TEZ_EXEC_INPLACE_PROGRESS( "hive.tez.exec.inplace.progress", true, - "Updates tez job execution progress in-place in the terminal.") + "Updates tez job execution progress in-place in the terminal."), + SPARK_CLIENT_TIMEOUT( + "hive.spark.client.timeout", + "60s", + new TimeValidator(TimeUnit.SECONDS), + "remote spark client get executor count timeout value in seconds.") ; public final String varname; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java index 5d6a02c..cd0f771 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java @@ -24,11 +24,13 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.apache.commons.compress.utils.CharsetNames; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; @@ -39,22 +41,22 @@ private static final String SPARK_DEFAULT_MASTER = "local"; private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark"; - public static HiveSparkClient createHiveSparkClient(Configuration configuration) + public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws IOException, SparkException { - Map conf = initiateSparkConf(configuration); + Map sparkConf = initiateSparkConf(hiveconf); // Submit spark job through local spark context while spark master is local mode, otherwise submit // spark job through remote spark context. - String master = conf.get("spark.master"); + String master = sparkConf.get("spark.master"); if (master.equals("local") || master.startsWith("local[")) { // With local spark context, all user sessions share the same spark context. - return LocalHiveSparkClient.getInstance(generateSparkConf(conf)); + return LocalHiveSparkClient.getInstance(generateSparkConf(sparkConf)); } else { - return new RemoteHiveSparkClient(conf); + return new RemoteHiveSparkClient(hiveconf, sparkConf); } } - public static Map initiateSparkConf(Configuration hiveConf) { + public static Map initiateSparkConf(HiveConf hiveConf) { Map sparkConf = new HashMap(); // set default spark configurations. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index e1946d5..f736191 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -71,12 +71,13 @@ private transient SparkClient remoteClient; private transient SparkConf sparkConf; + private transient HiveConf hiveConf; private transient List localJars = new ArrayList(); - private transient List localFiles = new ArrayList(); - RemoteHiveSparkClient(Map conf) throws IOException, SparkException { + RemoteHiveSparkClient(HiveConf hiveConf, Map conf) throws IOException, SparkException { + this.hiveConf = hiveConf; sparkConf = HiveSparkClientFactory.generateSparkConf(conf); remoteClient = SparkClientFactory.createClient(conf); } @@ -88,8 +89,9 @@ public SparkConf getSparkConf() { @Override public int getExecutorCount() throws Exception { + long timeout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_TIMEOUT, TimeUnit.SECONDS); Future handler = remoteClient.getExecutorCount(); - return handler.get(5, TimeUnit.SECONDS).intValue(); + return handler.get(timeout, TimeUnit.SECONDS).intValue(); } @Override @@ -109,9 +111,11 @@ public SparkJobRef execute(final DriverContext driverContext, final SparkWork sp byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir); byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork); + long timeout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_TIMEOUT, TimeUnit.SECONDS); + JobHandle jobHandle = remoteClient.submit( new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes)); - return new SparkJobRef(jobHandle.getClientJobId(), new RemoteSparkJobStatus(remoteClient, jobHandle)); + return new SparkJobRef(jobHandle.getClientJobId(), new RemoteSparkJobStatus(remoteClient, jobHandle, timeout)); } private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 6217de4..13bfb66 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -44,6 +44,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; /** * Used with remove spark client. @@ -57,10 +58,12 @@ private final long startTime; private final SparkClient sparkClient; private final JobHandle jobHandle; + private final transient long sparkClientTimeoutInSeconds; - public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle jobHandle) { + public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle jobHandle, long timeoutInSeconds) { this.sparkClient = sparkClient; this.jobHandle = jobHandle; + this.sparkClientTimeoutInSeconds = timeoutInSeconds; startTime = System.currentTimeMillis(); } @@ -161,7 +164,7 @@ public JobExecutionStatus status() { JobHandle getJobInfo = sparkClient.submit( new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId)); try { - return getJobInfo.get(); + return getJobInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); } catch (Throwable t) { LOG.warn("Error getting job info", t); return null; @@ -171,7 +174,7 @@ public JobExecutionStatus status() { private SparkStageInfo getSparkStageInfo(int stageId) { JobHandle getStageInfo = sparkClient.submit(new GetStageInfoJob(stageId)); try { - return getStageInfo.get(); + return getStageInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); } catch (Throwable t) { LOG.warn("Error getting stage info", t); return null;