diff --git data/conf/spark/hive-site.xml data/conf/spark/hive-site.xml index e42aa22..2f8c4de 100644 --- data/conf/spark/hive-site.xml +++ data/conf/spark/hive-site.xml @@ -210,4 +210,14 @@ ${spark.home} + + spark.driver.extraClassPath + ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar:${maven.local.repository}/org/apache/hive/hive-exec/${hive.version}/hive-exec-${hive.version}.jar + + + + hive.aux.jars.path + ${maven.local.repository}/org/apache/hive/hive-it-util/${hive.version}/hive-it-util-${hive.version}.jar + + diff --git spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index cbe06d8..a402ad8 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -234,6 +234,7 @@ public void call(JavaFutureAction future, SparkCounters sparkCounters) { // Catch throwables in a best-effort to report job status back to the client. It's // re-thrown so that the executor can destroy the affected thread (or the JVM can // die or whatever would happen if the throwable bubbled up). + LOG.info("Failed to run job " + req.id, t); client.tell(new Protocol.JobResult(req.id, null, t, null), actor); throw new ExecutionException(t); } finally { diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 2d65d0f..9676f29 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -55,6 +55,7 @@ private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class); private static final String DEFAULT_CONNECTION_TIMEOUT = "60"; // In seconds + private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds private final Map conf; private final AtomicInteger childIdGenerator; @@ -110,11 +111,16 @@ public void stop() { remoteRef.tell(new Protocol.EndSession(), clientRef); } unbind(clientRef); + long endTime = System.currentTimeMillis() + DEFAULT_SHUTDOWN_TIMEOUT; try { - driverThread.join(); // TODO: timeout? + driverThread.join(DEFAULT_SHUTDOWN_TIMEOUT); // TODO: timeout? } catch (InterruptedException ie) { LOG.debug("Interrupted before driver thread was finished."); } + if (endTime - System.currentTimeMillis() <= 0) { + LOG.debug("Shut down time out."); + driverThread.interrupt(); + } } @Override @@ -268,6 +274,8 @@ public void run() { if (exitCode != 0) { LOG.warn("Child process exited with code {}.", exitCode); } + } catch (InterruptedException ie) { + child.destroy(); } catch (Exception e) { LOG.warn("Exception while waiting for child process.", e); }