diff --git cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java index ca37dc5..9951ea7 100644 --- cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java +++ cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java @@ -94,8 +94,10 @@ public class CliDriver { // shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'"; try { Process executor = Runtime.getRuntime().exec(shell_cmd); - StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, ss.out); - StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, ss.err); + StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, + SessionState.getConsole().getChildOutStream()); + StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, + SessionState.getConsole().getChildErrStream()); outPrinter.start(); errPrinter.start(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java index fefe42d..432b13b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java @@ -18,28 +18,26 @@ package org.apache.hadoop.hive.ql.exec; +import java.io.IOException; import java.io.OutputStream; import java.io.Serializable; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Properties; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapred.JobConf; /** @@ -87,14 +85,15 @@ public class MapRedTask extends ExecDriver implements Serializable { if (!ctx.isLocalOnlyExecutionMode() && conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) { - if (inputSummary == null) + if (inputSummary == null) { inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null); + } // at this point the number of reducers is precisely defined in the plan int numReducers = work.getNumReduceTasks(); if (LOG.isDebugEnabled()) { - LOG.debug("Task: " + getId() + ", Summary: " + + LOG.debug("Task: " + getId() + ", Summary: " + inputSummary.getLength() + "," + inputSummary.getFileCount() + "," + numReducers); } @@ -211,9 +210,9 @@ public class MapRedTask extends ExecDriver implements Serializable { executor = Runtime.getRuntime().exec(cmdLine, env); StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), - null, System.out); + null, SessionState.getConsole().getChildOutStream()); StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), - null, System.err); + null, SessionState.getConsole().getChildErrStream()); outPrinter.start(); errPrinter.start(); @@ -239,8 +238,9 @@ public class MapRedTask extends ExecDriver implements Serializable { // creating the context can create a bunch of files. So make // sure to clear it out - if(ctxCreated) + if(ctxCreated) { ctx.clear(); + } } catch (Exception e) { LOG.error("Exception: " + e.getMessage()); @@ -324,9 +324,10 @@ public class MapRedTask extends ExecDriver implements Serializable { long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); - if(inputSummary == null) + if(inputSummary == null) { // compute the summary and stash it away inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null); + } long totalInputFileSize = inputSummary.getLength(); @@ -355,23 +356,26 @@ public class MapRedTask extends ExecDriver implements Serializable { long maxTasks = conf.getIntVar(HiveConf.ConfVars.LOCALMODEMAXTASKS); // check for max input size - if (inputSummary.getLength() > maxBytes) + if (inputSummary.getLength() > maxBytes) { return "Input Size (= " + inputSummary.getLength() + ") is larger than " + HiveConf.ConfVars.LOCALMODEMAXBYTES.varname + " (= " + maxBytes + ")"; + } // ideally we would like to do this check based on the number of splits // in the absence of an easy way to get the number of splits - do this // based on the total number of files (pessimistically assumming that // splits are equal to number of files in worst case) - if (inputSummary.getFileCount() > maxTasks) + if (inputSummary.getFileCount() > maxTasks) { return "Number of Input Files (= " + inputSummary.getFileCount() + - ") is larger than " + + ") is larger than " + HiveConf.ConfVars.LOCALMODEMAXTASKS.varname + "(= " + maxTasks + ")"; + } // since local mode only runs with 1 reducers - make sure that the // the number of reducers (set by user or inferred) is <=1 - if (numReducers > 1) + if (numReducers > 1) { return "Number of reducers (= " + numReducers + ") is more than 1"; + } return null; } diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index a36b06e..ee5ad4f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -62,14 +62,25 @@ public class SessionState { * HiveHistory Object */ protected HiveHistory hiveHist; + /** * Streams to read/write from. */ - public PrintStream out; public InputStream in; + public PrintStream out; public PrintStream err; /** + * Standard output from any child process(es). + */ + public PrintStream childOut; + + /* + * Error output from any child process(es). + */ + public PrintStream childErr; + + /** * type of the command. */ private String commandType; @@ -97,10 +108,11 @@ public class SessionState { } public boolean getIsSilent() { - if(conf != null) + if(conf != null) { return conf.getBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT); - else + } else { return isSilent; + } } public void setIsSilent(boolean isSilent) { @@ -251,6 +263,16 @@ public class SessionState { return ((ss != null) && (ss.err != null)) ? ss.err : System.err; } + public PrintStream getChildOutStream() { + SessionState ss = SessionState.get(); + return ((ss != null) && (ss.childOut != null)) ? ss.childOut : System.out; + } + + public PrintStream getChildErrStream() { + SessionState ss = SessionState.get(); + return ((ss != null) && (ss.childErr != null)) ? ss.childErr : System.err; + } + public boolean getIsSilent() { SessionState ss = SessionState.get(); // use the session or the one supplied in constructor diff --git ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java index 70d64ef..38212ef 100644 --- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java +++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java @@ -617,9 +617,9 @@ public class QTestUtil { Process executor = Runtime.getRuntime().exec(cmdLine); StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), - null, System.out); + null, SessionState.getConsole().getChildOutStream()); StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), - null, System.err); + null, SessionState.getConsole().getChildErrStream()); outPrinter.start(); errPrinter.start(); @@ -656,9 +656,9 @@ public class QTestUtil { Process executor = Runtime.getRuntime().exec(cmdLine); StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), - null, System.out); + null, SessionState.getConsole().getChildOutStream()); StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), - null, System.err); + null, SessionState.getConsole().getChildErrStream()); outPrinter.start(); errPrinter.start(); @@ -711,9 +711,9 @@ public class QTestUtil { Process executor = Runtime.getRuntime().exec(cmdArray); StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), - null, System.out); + null, SessionState.getConsole().getChildOutStream()); StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), - null, System.err); + null, SessionState.getConsole().getChildErrStream()); outPrinter.start(); errPrinter.start(); @@ -777,9 +777,9 @@ public class QTestUtil { Process executor = Runtime.getRuntime().exec(cmdArray1); StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), - null, System.out); + null, SessionState.getConsole().getChildOutStream()); StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), - null, System.err); + null, SessionState.getConsole().getChildErrStream()); outPrinter.start(); errPrinter.start(); @@ -801,9 +801,9 @@ public class QTestUtil { Process executor = Runtime.getRuntime().exec(cmdArray); StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), - null, System.out); + null, SessionState.getConsole().getChildOutStream()); StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), - null, System.err); + null, SessionState.getConsole().getChildErrStream()); outPrinter.start(); errPrinter.start(); @@ -880,9 +880,9 @@ public class QTestUtil { Process executor = Runtime.getRuntime().exec(cmdArray); StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), - null, System.out); + null, SessionState.getConsole().getChildOutStream()); StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), - null, System.err); + null, SessionState.getConsole().getChildErrStream()); outPrinter.start(); errPrinter.start();