diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index e8ca42a..52b62a5 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -70,7 +70,7 @@ private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class); - private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds + private static final long DEFAULT_REDIRECTOR_TIMEOUT = 60000; // In milliseconds private static final long MAX_ERR_LOG_LINES_FOR_RPC = 1000; private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS"; @@ -88,7 +88,7 @@ private final Map> jobs; private final Rpc driverRpc; private final ClientProtocol protocol; - private volatile boolean isAlive; + private volatile boolean isAlive = true; SparkClientImpl(RpcServer rpcServer, Map conf, HiveConf hiveConf) throws IOException, SparkException { this.conf = conf; @@ -131,7 +131,6 @@ public void rpcClosed(Rpc rpc) { } } }); - isAlive = true; } @Override @@ -157,16 +156,9 @@ public void stop() { } } - long endTime = System.currentTimeMillis() + DEFAULT_SHUTDOWN_TIMEOUT; - try { - driverThread.join(DEFAULT_SHUTDOWN_TIMEOUT); - } catch (InterruptedException ie) { - LOG.debug("Interrupted before driver thread was finished."); - } - if (endTime - System.currentTimeMillis() <= 0) { - LOG.warn("Timed out shutting down remote driver, interrupting..."); - driverThread.interrupt(); - } + // Simply interrupt the driver thread + // It will wait until we have anything else to read from the process streams + driverThread.interrupt(); } @Override @@ -448,8 +440,8 @@ public void run() { final Process child = pb.start(); int childId = childIdGenerator.incrementAndGet(); final List childErrorLog = new ArrayList(); - redirect("stdout-redir-" + childId, new Redirector(child.getInputStream())); - redirect("stderr-redir-" + childId, new Redirector(child.getErrorStream(), childErrorLog)); + final Thread stdoutThread = redirect("stdout-redir-" + childId, new Redirector(child.getInputStream())); + final Thread stderrThread = redirect("stderr-redir-" + childId, new Redirector(child.getErrorStream(), childErrorLog)); runnable = new Runnable() { @Override @@ -468,8 +460,15 @@ public void run() { LOG.warn("Child process exited with code {}", exitCode); } } catch (InterruptedException ie) { - LOG.warn("Waiting thread interrupted, killing child process."); - Thread.interrupted(); + try { + stdoutThread.join(); + stderrThread.join(); + } catch (InterruptedException e) { + LOG.warn("Interrupted during waiting for the redirect threads", e); + stdoutThread.interrupt(); + stderrThread.interrupt(); + } + child.destroy(); } catch (Exception e) { LOG.warn("Exception while waiting for child process.", e); @@ -485,11 +484,12 @@ public void run() { return thread; } - private void redirect(String name, Redirector redirector) { + private Thread redirect(String name, Redirector redirector) { Thread thread = new Thread(redirector); thread.setName(name); thread.setDaemon(true); thread.start(); + return thread; } private class ClientProtocol extends BaseProtocol { @@ -599,6 +599,9 @@ private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { private final BufferedReader in; private List errLogs; private int numErrLogLines = 0; + private final StringBuilder lineBuilder = new StringBuilder(); + private final char[] buffer = new char[4096]; + private boolean eof; Redirector(InputStream in) { this.in = new BufferedReader(new InputStreamReader(in)); @@ -612,20 +615,100 @@ private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { @Override public void run() { try { - String line = null; - while ((line = in.readLine()) != null) { - LOG.info(line); - if (errLogs != null) { - if (numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) { - errLogs.add(line); - } - } + String line; + while ((line = readLineInterruptible(0)) != null) { + processLine(line); + } + + LOG.debug("Continue reading stream after Spark client is stopped"); + long end = System.currentTimeMillis() + DEFAULT_REDIRECTOR_TIMEOUT; + while ((line = readLineInterruptible(end)) != null) { + processLine(line); } + } catch (Exception e) { LOG.warn("Error in redirector thread.", e); } } + private void processLine(String line) { + LOG.info(line); + if (errLogs != null && numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) { + errLogs.add(line); + } + } + + /** + * Returns the next line from the stream until the specified end time (in + * millis). If end is 0L it will return null in case of the flag + * {@link SparkClientImpl#isAlive} is false. It stops reading from the + * stream and starts returning the content of the builder if this thread is + * interrupted. + */ + private String readLineInterruptible(long end) throws IOException { + try { + while (!eof) { + if (Thread.interrupted()) { + LOG.warn( + "Redirector interrupted during reading from stream output"); + eof = true; + break; + } + + String ret = nextLine(); + if (ret != null) + return ret; + + while (!in.ready()) { + Thread.sleep(10); + if (end == 0) { + if (!isAlive) + return null; + } else if (System.currentTimeMillis() >= end) { + LOG.debug("Redirector timeout during reading from stream output"); + eof = true; + return nextLine(); + } + } + + int l = in.read(buffer); + if (l < 0) { + LOG.debug("Redirector reached EOF"); + eof = true; + return nextLine(); + } + + lineBuilder.append(buffer, 0, l); + } + } catch (InterruptedException e) { + LOG.warn("Redirector interrupted during waiting for stream output"); + eof = true; + } + return nextLine(); + } + + /** + * Returns the next line from the builder already read from the stream or + * null if it does not contain a full line ('\n'). In case of {@link #eof} + * the last line will be returned even without a '\n'. + */ + private String nextLine() { + int nl = -1; + for (int i = 0, n = lineBuilder.length(); i < n; ++i) + if (lineBuilder.charAt(i) == '\n') { + nl = i; + break; + } + + if (nl < 0) + return (eof && lineBuilder.length() > 0) ? lineBuilder.toString() + : null; + + String ret = lineBuilder.substring(0, nl); + lineBuilder.delete(0, nl + 1); + return ret; + } + } private static class AddJarJob implements Job {