diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index c5d4f9ad1f..26f1956328 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -22,9 +22,14 @@ import static org.apache.hadoop.hive.ql.exec.mr.MapRedTask.HADOOP_OPTS_KEY; import static org.apache.hadoop.hive.ql.exec.mr.MapRedTask.HIVE_SYS_PROP; +import java.io.BufferedReader; +import java.io.EOFException; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.PrintWriter; import java.io.Serializable; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; @@ -36,6 +41,7 @@ import java.util.Map; import java.util.Properties; +import jodd.io.StringOutputStream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; @@ -324,6 +330,9 @@ public int executeInChildVM(DriverContext driverContext) { // Run ExecDriver in another JVM executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir)); + redirectLog(Thread.currentThread().getName() + "-stdout", executor.getInputStream()); + redirectLog(Thread.currentThread().getName() + "-stderr", executor.getErrorStream()); + CachingPrintStream errPrintStream = new CachingPrintStream(System.err); StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out); @@ -358,6 +367,28 @@ public int executeInChildVM(DriverContext driverContext) { } } + private void redirectLog(final String logThreadName, final InputStream in) { + Thread logThread = new Thread(new Runnable() { + @Override + public void run() { + try { + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + String line; + while ((line = reader.readLine()) != null) { + LOG.info(line); + } + }catch (EOFException eof) { + //Ignore + } catch (Exception e) { + LOG.warn("Error in redirector thread.", e); + } + } + }); + logThread.setName(logThreadName); + logThread.setDaemon(true); + logThread.start(); + } + public int executeInProcess(DriverContext driverContext) { // check the local work if (work == null) { @@ -383,14 +414,21 @@ public int executeInProcess(DriverContext driverContext) { console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: " + Utilities.showTime(elapsed) + " sec."); } catch (Throwable throwable) { + int retVal; + String message; if (throwable instanceof OutOfMemoryError || (throwable instanceof MapJoinMemoryExhaustionError)) { - l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable); - return 3; + message = "Hive Runtime Error: Map local work exhausted memory"; + retVal = 3; } else { - l4j.error("Hive Runtime Error: Map local work failed", throwable); - return 2; + message = "Hive Runtime Error: Map local work failed"; + retVal = 2; } + l4j.error(message, throwable); + StringOutputStream so = new StringOutputStream(); + throwable.printStackTrace(new PrintWriter(so)); + console.printError(message, so.toString()); + return retVal; } return 0; }