diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index e8ca42a..74fea23 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -49,7 +49,9 @@ import java.util.Map; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -70,7 +72,7 @@ private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class); - private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds + private static final long DEFAULT_REDIRECTOR_TIMEOUT = 60000; // In milliseconds private static final long MAX_ERR_LOG_LINES_FOR_RPC = 1000; private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS"; @@ -113,12 +115,6 @@ LOG.error("Error while waiting for client to connect.", e); } driverThread.interrupt(); - try { - driverThread.join(); - } catch (InterruptedException ie) { - // Give up. - LOG.debug("Interrupted before driver thread was finished."); - } throw Throwables.propagate(e); } @@ -148,25 +144,22 @@ public void rpcClosed(Rpc rpc) { public void stop() { if (isAlive) { isAlive = false; + Future endSessionFuture = null; try { - protocol.endSession(); + endSessionFuture = protocol.endSession(); + endSessionFuture.get(10, TimeUnit.SECONDS); } catch (Exception e) { LOG.warn("Exception while waiting for end session reply.", e); } finally { + if (endSessionFuture != null && !endSessionFuture.isDone()) + endSessionFuture.cancel(true); driverRpc.close(); } } - long endTime = System.currentTimeMillis() + DEFAULT_SHUTDOWN_TIMEOUT; - try { - driverThread.join(DEFAULT_SHUTDOWN_TIMEOUT); - } catch (InterruptedException ie) { - LOG.debug("Interrupted before driver thread was finished."); - } - if (endTime - System.currentTimeMillis() <= 0) { - LOG.warn("Timed out shutting down remote driver, interrupting..."); - driverThread.interrupt(); - } + // Simply interrupt the driver thread + // It will wait until we have anything else to read from the process streams + driverThread.interrupt(); } @Override @@ -448,8 +441,10 @@ public void run() { final Process child = pb.start(); int childId = childIdGenerator.incrementAndGet(); final List childErrorLog = new ArrayList(); - redirect("stdout-redir-" + childId, new Redirector(child.getInputStream())); - redirect("stderr-redir-" + childId, new Redirector(child.getErrorStream(), childErrorLog)); + final Redirector stdoutRedirector = new Redirector(child.getInputStream()); + final Thread stdoutThread = redirect("stdout-redir-" + childId, stdoutRedirector); + final Redirector stderrRedirector = new Redirector(child.getErrorStream(), childErrorLog); + final Thread stderrThread = redirect("stderr-redir-" + childId, stderrRedirector); runnable = new Runnable() { @Override @@ -468,11 +463,22 @@ public void run() { LOG.warn("Child process exited with code {}", exitCode); } } catch (InterruptedException ie) { - LOG.warn("Waiting thread interrupted, killing child process."); - Thread.interrupted(); - child.destroy(); + LOG.debug("Waiting thread interrupted, killing child process."); } catch (Exception e) { LOG.warn("Exception while waiting for child process.", e); + } finally { + stdoutRedirector.stop(); + stderrRedirector.stop(); + try { + stdoutThread.join(); + stderrThread.join(); + } catch (InterruptedException e) { + LOG.warn("Interrupted during waiting for the redirect threads", e); + stdoutThread.interrupt(); + stderrThread.interrupt(); + } + + child.destroy(); } } }; @@ -485,11 +491,12 @@ public void run() { return thread; } - private void redirect(String name, Redirector redirector) { + private Thread redirect(String name, Redirector redirector) { Thread thread = new Thread(redirector); thread.setName(name); thread.setDaemon(true); thread.start(); + return thread; } private class ClientProtocol extends BaseProtocol { @@ -541,7 +548,21 @@ void cancel(String jobId) { } Future endSession() { - return driverRpc.call(new EndSession()); + driverRpc.call(new EndSession()); + + // Cannot use the Future of call(Object) in case of EndSession as it shuts + // down the remote driver therefore, Future.get() will never respond + return Executors.newSingleThreadExecutor().submit(new Runnable() { + @Override + public void run() { + try { + while (driverRpc.isActive() && !Thread.interrupted()) { + Thread.sleep(10); + } + } catch (InterruptedException e) { + } + } + }); } private void handle(ChannelHandlerContext ctx, Error msg) { @@ -599,11 +620,19 @@ private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { private final BufferedReader in; private List errLogs; private int numErrLogLines = 0; + private final StringBuilder lineBuilder = new StringBuilder(); + private final char[] buffer = new char[4096]; + private boolean eof; + private volatile boolean stopped; Redirector(InputStream in) { this.in = new BufferedReader(new InputStreamReader(in)); } + public void stop() { + stopped = true; + } + Redirector(InputStream in, List errLogs) { this.in = new BufferedReader(new InputStreamReader(in)); this.errLogs = errLogs; @@ -612,20 +641,105 @@ private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { @Override public void run() { try { - String line = null; - while ((line = in.readLine()) != null) { - LOG.info(line); - if (errLogs != null) { - if (numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) { - errLogs.add(line); - } - } + String line; + while ((line = readLineInterruptible(0)) != null) { + processLine(line); } + + LOG.debug("Continue reading stream after Spark client is stopped"); + long end = System.currentTimeMillis() + DEFAULT_REDIRECTOR_TIMEOUT; + while ((line = readLineInterruptible(end)) != null) { + processLine(line); + } + } catch (Exception e) { + // Write out the already buffered lines + eof = true; + for (String line = nextLine(); line != null; line = nextLine()) + processLine(line); + LOG.warn("Error in redirector thread.", e); } } + private void processLine(String line) { + LOG.info(line); + if (errLogs != null && numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) { + errLogs.add(line); + } + } + + /** + * Returns the next line from the stream until the specified end time (in + * millis). If end is 0L it will return null in case of the flag + * {@link #stopped} is true. It stops reading from the stream and starts + * returning the content of the builder in case of this thread is + * interrupted. + */ + private String readLineInterruptible(long end) throws IOException { + try { + while (!eof) { + if (Thread.interrupted()) { + LOG.warn( + "Redirector interrupted during reading from stream output"); + eof = true; + break; + } + + String ret = nextLine(); + if (ret != null) + return ret; + + while (!in.ready()) { + Thread.sleep(10); + if (end == 0) { + if (stopped) + return null; + } else if (System.currentTimeMillis() >= end) { + LOG.debug("Redirector timeout during reading from stream output"); + eof = true; + return nextLine(); + } + } + + int l = in.read(buffer); + if (l < 0) { + LOG.debug("Redirector reached EOF"); + eof = true; + return nextLine(); + } + + lineBuilder.append(buffer, 0, l); + } + } catch (InterruptedException e) { + LOG.warn("Redirector interrupted during waiting for stream output"); + eof = true; + } + return nextLine(); + } + + /** + * Returns the next line from the builder already read from the stream or + * null if it does not contain a full line ('\n'). In case of {@link #eof} + * the last line will be returned even without a '\n'. + */ + private String nextLine() { + int nl = -1; + for (int i = 0, n = lineBuilder.length(); i < n; ++i) + if (lineBuilder.charAt(i) == '\n') { + nl = i; + break; + } + + if (nl < 0) + return (eof && lineBuilder.length() > 0) ? lineBuilder.toString() + : null; + + String ret = lineBuilder.substring(0, nl); + lineBuilder.delete(0, nl + 1); + return ret; + } + } private static class AddJarJob implements Job {