diff --git a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java index bcc66cf..9517d34 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; +import org.apache.commons.lang3.CharEncoding; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.processors.CommandProcessor; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -56,6 +57,8 @@ */ private BufferedReader resultReader; + //The file stream opened for the temp file. + private FileOutputStream fileStream = null; protected HiveCommandOperation(HiveSession parentSession, String statement, CommandProcessor commandProcessor, Map confOverlay) { @@ -69,16 +72,19 @@ private void setupSessionIO(SessionState sessionState) { LOG.info("Putting temp output to file " + sessionState.getTmpOutputFile().toString()); sessionState.in = null; // hive server's session input stream is not used // open a per-session file in auto-flush mode for writing temp results - sessionState.out = new PrintStream(new FileOutputStream(sessionState.getTmpOutputFile()), true, "UTF-8"); + fileStream = new FileOutputStream(sessionState.getTmpOutputFile()); + sessionState.out = new PrintStream(fileStream, true, CharEncoding.UTF_8); // TODO: for hadoop jobs, progress is printed out to session.err, // we should find a way to feed back job progress to client - sessionState.err = new PrintStream(System.err, true, "UTF-8"); + sessionState.err = new PrintStream(System.err, true, CharEncoding.UTF_8); } catch (IOException e) { LOG.error("Error in creating temp output file ", e); try { + IOUtils.cleanup(LOG, fileStream); + fileStream = null; sessionState.in = null; - sessionState.out = new PrintStream(System.out, true, "UTF-8"); - sessionState.err = new PrintStream(System.err, true, "UTF-8"); + sessionState.out = new PrintStream(System.out, true, CharEncoding.UTF_8); + sessionState.err = new PrintStream(System.err, true, CharEncoding.UTF_8); } catch (UnsupportedEncodingException ee) { LOG.error("Error creating PrintStream", e); ee.printStackTrace(); @@ -88,10 +94,18 @@ private void setupSessionIO(SessionState sessionState) { } } - private void tearDownSessionIO() { - IOUtils.cleanup(LOG, parentSession.getSessionState().out); - IOUtils.cleanup(LOG, parentSession.getSessionState().err); + if (parentSession.getSessionState().out != null) { + parentSession.getSessionState().out.flush(); + } + + if (parentSession.getSessionState().err != null) { + parentSession.getSessionState().err.flush(); + } + + if (fileStream != null) { + IOUtils.cleanup(LOG, parentSession.getSessionState().out); + } } @Override diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index cc9df76..c57f069 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -18,7 +18,9 @@ package org.apache.hive.service.cli.operation; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.PrintStream; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.security.PrivilegedExceptionAction; @@ -31,6 +33,8 @@ import java.util.concurrent.RejectedExecutionException; import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.CharEncoding; +import org.apache.hadoop.hive.common.io.CachingPrintStream; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; @@ -80,6 +84,36 @@ public SQLOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runInBackground) { // TODO: call setRemoteUser in ExecuteStatementOperation or higher. super(parentSession, statement, confOverlay, runInBackground); + setupSessionIO(parentSession.getSessionState()); + } + + private void setupSessionIO(SessionState sessionState) { + try { + sessionState.in = null; // hive server's session input stream is not used + sessionState.out = new PrintStream(System.out, true, CharEncoding.UTF_8); + sessionState.info = new PrintStream(System.err, true, CharEncoding.UTF_8); + sessionState.err = new CachingPrintStream(System.err, true, CharEncoding.UTF_8); + } catch (FileNotFoundException | UnsupportedEncodingException e) { + LOG.error("Error creating PrintStream", e); + e.printStackTrace(); + sessionState.out = null; + sessionState.info = null; + sessionState.err = null; + } + } + + private void tearDownSessionIO() { + if (parentSession.getSessionState().out != null) { + parentSession.getSessionState().out.flush(); + } + + if (parentSession.getSessionState().err != null) { + parentSession.getSessionState().err.flush(); + } + + if (parentSession.getSessionState().info != null) { + parentSession.getSessionState().info.flush(); + } } /*** @@ -300,6 +334,7 @@ public void cancel() throws HiveSQLException { @Override public void close() throws HiveSQLException { cleanup(OperationState.CLOSED); + tearDownSessionIO(); cleanupOperationLog(); }