diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3295d1dbc5..0c8172c4c8 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4197,7 +4197,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "in shuffle. This should result in less shuffled data."), SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), - "Timeout for requests from Hive client to remote Spark driver."), + "Timeout for requests between Hive client and remote Spark driver."), SPARK_JOB_MONITOR_TIMEOUT("hive.spark.job.monitor.timeout", "60s", new TimeValidator(TimeUnit.SECONDS), "Timeout for job monitor to get Spark job state."), diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index caa850c08c..abd42fcccc 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -35,6 +35,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; @@ -92,6 +94,8 @@ public static final String REMOTE_DRIVER_PORT_CONF = "--remote-port"; public static final String REMOTE_DRIVER_CONF = "--remote-driver-conf"; + private final long futureTimeout; // Rpc call timeout in milliseconds + private RemoteDriver(String[] args) throws Exception { this.activeJobs = Maps.newConcurrentMap(); this.jcLock = new Object(); @@ -135,6 +139,9 @@ private RemoteDriver(String[] args) throws Exception { String secret = mapConf.get(SparkClientFactory.CONF_KEY_SECRET); Preconditions.checkArgument(secret != null, "No secret provided."); + RpcConfiguration rpcConf = new RpcConfiguration(mapConf); + futureTimeout = rpcConf.getFutureTimeoutMs(); + int threadCount = new RpcConfiguration(mapConf).getRpcThreadCount(); this.egroup = new NioEventLoopGroup( threadCount, @@ -232,13 +239,19 @@ private synchronized void shutdown(Throwable error) { for (JobWrapper job : activeJobs.values()) { cancelJob(job); } + if (error != null) { - protocol.sendError(error); + try { + protocol.sendError(error).get(futureTimeout, TimeUnit.MILLISECONDS); + } catch(InterruptedException|ExecutionException|TimeoutException e) { + // ignore + } } if (jc != null) { jc.stop(); } clientRpc.close(); + egroup.shutdownGracefully(); synchronized (shutdownLock) { shutdownLock.notifyAll(); @@ -265,34 +278,35 @@ private String getArg(String[] args, int keyIdx) { private class DriverProtocol extends BaseProtocol { - void sendError(Throwable error) { + Future sendError(Throwable error) { LOG.debug("Send error to Client: {}", Throwables.getStackTraceAsString(error)); - clientRpc.call(new Error(Throwables.getStackTraceAsString(error))); + return clientRpc.call(new Error(Throwables.getStackTraceAsString(error))); } - void sendErrorMessage(String cause) { + Future sendErrorMessage(String cause) { LOG.debug("Send error to Client: {}", cause); - clientRpc.call(new Error(cause)); + return clientRpc.call(new Error(cause)); } - void jobFinished(String jobId, T result, + + Future jobFinished(String jobId, T result, Throwable error, SparkCounters counters) { LOG.debug("Send job({}) result to Client.", jobId); - clientRpc.call(new JobResult(jobId, result, error, counters)); + return clientRpc.call(new JobResult(jobId, result, error, counters)); } - void jobStarted(String jobId) { - clientRpc.call(new JobStarted(jobId)); + Future jobStarted(String jobId) { + return clientRpc.call(new JobStarted(jobId)); } - void jobSubmitted(String jobId, int sparkJobId) { + Future jobSubmitted(String jobId, int sparkJobId) { LOG.debug("Send job({}/{}) submitted to Client.", jobId, sparkJobId); - clientRpc.call(new JobSubmitted(jobId, sparkJobId)); + return clientRpc.call(new JobSubmitted(jobId, sparkJobId)); } - void sendMetrics(String jobId, int sparkJobId, int stageId, long taskId, Metrics metrics) { + Future sendMetrics(String jobId, int sparkJobId, int stageId, long taskId, Metrics metrics) { LOG.debug("Send task({}/{}/{}/{}) metric to Client.", jobId, sparkJobId, stageId, taskId); - clientRpc.call(new JobMetrics(jobId, sparkJobId, stageId, taskId, metrics)); + return clientRpc.call(new JobMetrics(jobId, sparkJobId, stageId, taskId, metrics)); } private void handle(ChannelHandlerContext ctx, CancelJob msg) { @@ -550,8 +564,7 @@ public static void main(String[] args) throws Exception { // If the main thread throws an exception for some reason, propagate the exception to the // client and initiate a safe shutdown if (rd.running) { - rd.protocol.sendError(e); - rd.shutdown(null); + rd.shutdown(e); } throw e; } diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index 090c628001..bd3a7a7321 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -46,6 +46,7 @@ private static final Logger LOG = LoggerFactory.getLogger(RpcConfiguration.class); public static final ImmutableSet HIVE_SPARK_RSC_CONFIGS = ImmutableSet.of( + HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname, HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname, HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, @@ -55,6 +56,7 @@ HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname ); public static final ImmutableSet HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of( + HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname, HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname ); @@ -71,6 +73,12 @@ public RpcConfiguration(Map config) { this.config = Collections.unmodifiableMap(config); } + public long getFutureTimeoutMs() { + String value = config.get(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT.varname); + return value != null ? Long.parseLong(value) : DEFAULT_CONF.getTimeVar( + HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.MILLISECONDS); + } + long getConnectTimeoutMs() { String value = config.get(HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname); return value != null ? Long.parseLong(value) : DEFAULT_CONF.getTimeVar(