diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index ae78bc376a569fc00aa3f291713762e30ce22bfd..dfe263fb1a6511af7028861a9759b9f06722730c 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -42,6 +42,7 @@ import java.io.Writer; import java.net.URI; import java.net.URL; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -68,6 +69,7 @@ private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class); private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds + private static final long MAX_ERR_LOG_LINES_FOR_RPC = 1000; private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS"; private static final String SPARK_HOME_ENV = "SPARK_HOME"; @@ -391,7 +393,6 @@ public void run() { argv.add(numOfExecutors); } } - if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) { try { String currentUser = Utils.getUGI().getShortUserName(); @@ -445,8 +446,9 @@ public void run() { final Process child = pb.start(); int childId = childIdGenerator.incrementAndGet(); - redirect("stdout-redir-" + childId, child.getInputStream()); - redirect("stderr-redir-" + childId, child.getErrorStream()); + final List childErrorLog = new ArrayList(); + redirect("stdout-redir-" + childId, new Redirector(child.getInputStream())); + redirect("stderr-redir-" + childId, new Redirector(child.getErrorStream(), childErrorLog)); runnable = new Runnable() { @Override @@ -454,8 +456,15 @@ public void run() { try { int exitCode = child.waitFor(); if (exitCode != 0) { - rpcServer.cancelClient(clientId, "Child process exited before connecting back"); - LOG.warn("Child process exited with code {}.", exitCode); + StringBuilder errStr = new StringBuilder(); + for (String s : childErrorLog) { + errStr.append(s); + errStr.append('\n'); + } + + rpcServer.cancelClient(clientId, + "Child process exited before connecting back with error log " + errStr.toString()); + LOG.warn("Child process exited with code {}", exitCode); } } catch (InterruptedException ie) { LOG.warn("Waiting thread interrupted, killing child process."); @@ -475,8 +484,8 @@ public void run() { return thread; } - private void redirect(String name, InputStream in) { - Thread thread = new Thread(new Redirector(in)); + private void redirect(String name, Redirector redirector) { + Thread thread = new Thread(redirector); thread.setName(name); thread.setDaemon(true); thread.start(); @@ -587,17 +596,29 @@ private void handle(ChannelHandlerContext ctx, JobSubmitted msg) { private class Redirector implements Runnable { private final BufferedReader in; + private List errLogs; + private int numErrLogLines = 0; Redirector(InputStream in) { this.in = new BufferedReader(new InputStreamReader(in)); } + Redirector(InputStream in, List errLogs) { + this.in = new BufferedReader(new InputStreamReader(in)); + this.errLogs = errLogs; + } + @Override public void run() { try { String line = null; while ((line = in.readLine()) != null) { LOG.info(line); + if (errLogs != null) { + if (numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) { + errLogs.add(line); + } + } } } catch (Exception e) { LOG.warn("Error in redirector thread.", e);