diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index fa3e048..b55177d 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -792,6 +792,11 @@
HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME("hive.server2.async.exec.keepalive.time", 10),
+ HIVE_SERVER2_LOG_REDIRECTION_ENABLED("hive.server2.log.redirection.enabled", true),
+ HIVE_SERVER2_LOG_DIRECTORY("hive.server2.query.log.dir", "query_logs"),
+ // By default, logs will be purged after a week
+ HIVE_SERVER2_LOG_PURGE_DELAY("hive.server2.log.purge.delay", 7 * 1440 * 60 * 1000L),
+
// HiveServer2 auth configuration
HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE",
new StringsValidator("NOSASL", "NONE", "LDAP", "KERBEROS", "CUSTOM")),
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index c61a0bb..f206c2b 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -2047,6 +2047,28 @@
+ hive.server2.log.redirection.enabled
+ true
+ Set this flag to false to disable HiveServer2 session and and operation logs
+
+
+
+ hive.server2.query.log.dir
+ query_logs
+ Top level directory where session and operation level logs are stored if log redirection is
+ enabled.
+
+
+
+
+ hive.server2.log.purge.delay
+ 604800000
+ Duration in milliseconds after which logs of closed or abandoned sessions and operations will be purged.
+ Default is one week
+
+
+
+
hive.plan.serialization.format
kryo
diff --git itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java
index ebda296..b8d693a 100644
--- itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java
+++ itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java
@@ -18,12 +18,17 @@
package org.apache.hive.service.cli;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.cli.thrift.EmbeddedThriftBinaryCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIService;
import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.UUID;
/**
* TestEmbeddedThriftBinaryCLIService.
@@ -55,5 +60,4 @@ public void setUp() throws Exception {
public void tearDown() throws Exception {
super.tearDown();
}
-
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 62fc150..fbb1901 100644
--- ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -114,7 +114,7 @@
static final private String CLASS_NAME = Driver.class.getName();
static final private Log LOG = LogFactory.getLog(CLASS_NAME);
- static final private LogHelper console = new LogHelper(LOG);
+ private LogHelper console = new LogHelper(LOG);
private static final Object compileMonitor = new Object();
@@ -136,6 +136,8 @@
private int maxthreads;
private static final int SLEEP_TIME = 2000;
protected int tryCount = Integer.MAX_VALUE;
+ private DriverContext driverCxt;
+ private boolean logRedirecionEnabled;
private String userName;
@@ -1470,6 +1472,14 @@ public void launchTask(Task extends Serializable> tsk, String queryId, boolean
console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
}
tsk.initialize(conf, plan, cxt);
+ if (logRedirecionEnabled) {
+ tsk.setLogRedirectionEnabled(true);
+ tsk.getConsole().setInfoStream(console.getInfoStream());
+ tsk.getConsole().setErrorStream(console.getErrStream());
+ tsk.getConsole().setOutStream(console.getOutStream());
+ tsk.getConsole().setChildErrStream(console.getChildErrStream());
+ tsk.getConsole().setChildOutStream(console.getChildOutStream());
+ }
TaskResult tskRes = new TaskResult();
TaskRunner tskRun = new TaskRunner(tsk, tskRes);
@@ -1663,4 +1673,15 @@ public String getErrorMsg() {
return errorMessage;
}
+ public void setConsole(LogHelper console) {
+ this.console = console;
+ }
+
+ public boolean isLogRedirecionEnabled() {
+ return logRedirecionEnabled;
+ }
+
+ public void setLogRedirecionEnabled(boolean isLogRedirecionEnabled) {
+ this.logRedirecionEnabled = isLogRedirecionEnabled;
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 655395c..6678c69 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -67,6 +67,7 @@
protected int taskTag;
private boolean isLocalMode =false;
private boolean retryCmdWhenFail = false;
+ protected boolean isLogRedirectionEnabled;
public static final int NO_TAG = 0;
public static final int COMMON_JOIN = 1;
@@ -86,6 +87,14 @@
protected String id;
protected T work;
+ public void setConsole(LogHelper console) {
+ this.console = console;
+ }
+
+ public LogHelper getConsole() {
+ return console;
+ }
+
public static enum FeedType {
DYNAMIC_PARTITIONS, // list of dynamic partitions
}
@@ -132,7 +141,6 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverC
throw new RuntimeException(e);
}
this.driverContext = driverContext;
-
console = new LogHelper(LOG);
}
@@ -510,5 +518,12 @@ void setException(Throwable ex) {
public String toString() {
return getId() + ":" + getType();
+ }
+ public boolean isLogRedirectionEnabled() {
+ return isLogRedirectionEnabled;
+ }
+
+ public void setLogRedirectionEnabled(boolean isLogRedirectionEnabled) {
+ this.isLogRedirectionEnabled = isLogRedirectionEnabled;
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
index a7e2253..1f1eb27 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
@@ -261,16 +261,16 @@ public int execute(DriverContext driverContext) {
// Run ExecDriver in another JVM
executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir));
- CachingPrintStream errPrintStream =
- new CachingPrintStream(SessionState.getConsole().getChildErrStream());
-
- StreamPrinter outPrinter = new StreamPrinter(
- executor.getInputStream(), null,
- SessionState.getConsole().getChildOutStream());
- StreamPrinter errPrinter = new StreamPrinter(
- executor.getErrorStream(), null,
- errPrintStream);
-
+ StreamPrinter outPrinter = null, errPrinter = null;
+ CachingPrintStream errPrintStream = null;
+ if (isLogRedirectionEnabled) {
+ outPrinter = new StreamPrinter(executor.getInputStream(), null, getConsole().getChildOutStream());
+ errPrintStream = new CachingPrintStream(getConsole().getChildErrStream());
+ } else {
+ outPrinter = new StreamPrinter(executor.getInputStream(), null, SessionState.getConsole().getChildOutStream());
+ errPrintStream = new CachingPrintStream(SessionState.getConsole().getChildErrStream());
+ }
+ errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream);
outPrinter.start();
errPrinter.start();
diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 0684aac..dd765fb 100644
--- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -362,6 +362,9 @@ private static String makeSessionId() {
return UUID.randomUUID().toString();
}
+ public void setSessionConsole(LogHelper sessionConsole) {
+ this.sessionConsole = sessionConsole;
+ }
/**
* This class provides helper routines to emit informational and error
* messages to the user and log4j files while obeying the current session's
@@ -379,6 +382,11 @@ private static String makeSessionId() {
protected Log LOG;
protected boolean isSilent;
+ private PrintStream childErrStream;
+ private PrintStream childOutStream;
+ private PrintStream errorStream;
+ private PrintStream outStream;
+ private PrintStream infoStream;
public LogHelper(Log LOG) {
this(LOG, false);
@@ -390,26 +398,42 @@ public LogHelper(Log LOG, boolean isSilent) {
}
public PrintStream getOutStream() {
+ if (outStream != null) {
+ return outStream;
+ }
SessionState ss = SessionState.get();
return ((ss != null) && (ss.out != null)) ? ss.out : System.out;
}
public PrintStream getInfoStream() {
+ if (infoStream != null) {
+ return infoStream;
+ }
SessionState ss = SessionState.get();
return ((ss != null) && (ss.info != null)) ? ss.info : getErrStream();
}
public PrintStream getErrStream() {
+ if (errorStream != null) {
+ return errorStream;
+ }
SessionState ss = SessionState.get();
return ((ss != null) && (ss.err != null)) ? ss.err : System.err;
}
public PrintStream getChildOutStream() {
+ if (childOutStream != null) {
+ return childOutStream;
+ }
+
SessionState ss = SessionState.get();
return ((ss != null) && (ss.childOut != null)) ? ss.childOut : System.out;
}
public PrintStream getChildErrStream() {
+ if (childErrStream != null) {
+ return childErrStream;
+ }
SessionState ss = SessionState.get();
return ((ss != null) && (ss.childErr != null)) ? ss.childErr : System.err;
}
@@ -439,14 +463,36 @@ public void printError(String error, String detail) {
getErrStream().println(error);
LOG.error(error + StringUtils.defaultString(detail));
}
+
+ public void setChildErrStream(PrintStream childErrStream) {
+ this.childErrStream = childErrStream;
+ }
+ public void setChildOutStream(PrintStream childOutStream) {
+ this.childOutStream = childOutStream;
+ }
+ public void setInfoStream(PrintStream stream) {
+ infoStream = stream;
+ }
+ public void setOutStream(PrintStream stream) {
+ outStream = stream;
+ }
+ public void setErrorStream(PrintStream stream) {
+ errorStream = stream;
+ }
}
private static LogHelper _console;
+ private LogHelper sessionConsole;
/**
* initialize or retrieve console object for SessionState.
*/
public static LogHelper getConsole() {
+ SessionState ss = tss.get();
+ if (ss != null && ss.sessionConsole != null) {
+ return ss.sessionConsole;
+ }
+
if (_console == null) {
Log LOG = LogFactory.getLog("SessionState");
_console = new LogHelper(LOG);
diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 5d1dd5f..3fdc46f 100644
--- service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -18,10 +18,12 @@
package org.apache.hive.service.cli.operation;
import java.util.EnumSet;
+import java.io.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.service.cli.FetchOrientation;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.OperationHandle;
@@ -41,6 +43,9 @@
public static final Log LOG = LogFactory.getLog(Operation.class.getName());
public static final long DEFAULT_FETCH_MAX_ROWS = 100;
protected boolean hasResultSet;
+ protected SessionState.LogHelper console;
+ protected PrintStream opLogOut;
+ protected PrintStream opLogErr;
protected static final EnumSet DEFAULT_FETCH_ORIENTATION_SET =
EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
@@ -49,6 +54,7 @@ protected Operation(HiveSession parentSession, OperationType opType) {
super();
this.parentSession = parentSession;
opHandle = new OperationHandle(opType);
+ console = new SessionState.LogHelper(LOG);
}
public void setConfiguration(HiveConf configuration) {
@@ -153,4 +159,49 @@ protected void validateFetchOrientation(FetchOrientation orientation,
" is not supported for this resultset", "HY106");
}
}
+
+ public void setConsole(SessionState.LogHelper log) {
+ console = log;
+ }
+
+ public SessionState.LogHelper getConsole() {
+ return console;
+ }
+
+ protected void openLogStreams() {
+ if (parentSession.isLogRedirectionEnabled()) {
+ File sessionLogDir = parentSession.getSessionLogDir();
+ String operationId = opHandle.getHandleIdentifier().toString();
+ try {
+ opLogOut = new PrintStream(new FileOutputStream(new File(sessionLogDir, operationId + ".out")),
+ true, "UTF-8");
+ opLogErr = new PrintStream(new FileOutputStream(new File(sessionLogDir, operationId + ".err")),
+ true, "UTF-8" );
+ console.setInfoStream(opLogOut);
+ console.setOutStream(opLogOut);
+ console.setErrorStream(opLogErr);
+ console.setChildErrStream(opLogErr);
+ console.setChildOutStream(opLogOut);
+ } catch (FileNotFoundException e) {
+ LOG.error("Error setting up log redirection for operation " + getHandle().toString(), e);
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("Error setting up log redirection, unable to create print stream", e);
+ }
+
+ LOG.info("Created session log files in dir " + parentSession.getSessionLogDir().getAbsolutePath()
+ + " operation " + operationId);
+ }
+ }
+
+ protected void closeLogStreams() {
+ if (parentSession.isLogRedirectionEnabled()) {
+ if (opLogOut != null) {
+ opLogOut.close();
+ }
+
+ if (opLogErr != null) {
+ opLogErr.close();
+ }
+ }
+ }
}
diff --git service/src/java/org/apache/hive/service/cli/operation/OperationManager.java service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index bcdb67f..a1f2fed 100644
--- service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -18,10 +18,13 @@
package org.apache.hive.service.cli.operation;
+import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.AbstractService;
import org.apache.hive.service.cli.FetchOrientation;
@@ -49,7 +52,6 @@ public OperationManager() {
@Override
public synchronized void init(HiveConf hiveConf) {
this.hiveConf = hiveConf;
-
super.init(hiveConf);
}
diff --git service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 296f8b3..741f624 100644
--- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -74,6 +74,7 @@ public SQLOperation(HiveSession parentSession, String statement, Map opHandleSet = new HashSet();
+ private boolean isLogRedirectionEnabled;
+ protected File queryLogDir;
+ private PrintStream sessionOut;
+ private PrintStream sessionErr;
+ private File sessionLogDir;
public HiveSessionImpl(String username, String password, Map sessionConf) {
this.username = username;
@@ -97,6 +102,58 @@ public HiveSessionImpl(String username, String password, Map ses
SessionState.start(sessionState);
}
+ @Override
+ public void setupLogRedirection(File queryLogDir) throws HiveSQLException {
+ // Create session.out and .err directories
+ sessionLogDir = new File(queryLogDir, sessionHandle.getHandleIdentifier().toString());
+ isLogRedirectionEnabled = true;
+ if (!sessionLogDir.exists()) {
+ if (!sessionLogDir.mkdir()) {
+ LOG.warn("Query logs - unable to create session dir " + sessionLogDir.getAbsolutePath());
+ isLogRedirectionEnabled = false;
+ return;
+ }
+ } else {
+ LOG.warn("Session log directory already exsits " + sessionLogDir.getAbsolutePath());
+ }
+
+ try {
+ sessionOut = new PrintStream(new FileOutputStream(new File(sessionLogDir, "session.out")), true, "UTF-8");
+ sessionErr = new PrintStream(new FileOutputStream(new File(sessionLogDir, "session.err")), true, "UTF-8");
+ SessionState.LogHelper console = new SessionState.LogHelper(LOG);
+ console.setErrorStream(sessionErr);
+ console.setInfoStream(sessionOut);
+ console.setOutStream(sessionOut);
+ sessionState.setSessionConsole(console);
+ LOG.info("Created session log files in " + sessionLogDir.getAbsolutePath());
+ } catch (FileNotFoundException e) {
+ isLogRedirectionEnabled = false;
+ LOG.error("Error creating session log file for " + sessionHandle.toString(), e);
+ } catch (UnsupportedEncodingException e) {
+ isLogRedirectionEnabled = false;
+ LOG.error("Error creating session log file for " + sessionHandle.toString(), e);
+ }
+ }
+
+ private void closeSessionLog() {
+ if (isLogRedirectionEnabled) {
+ if (sessionOut != null) {
+ sessionOut.close();
+ }
+
+ if (sessionErr != null) {
+ sessionErr.close();
+ }
+ // Mark session as closed in log dir
+ File sessionClosed = new File(sessionLogDir, SESSION_CLOSED_MARKER);
+ try {
+ sessionClosed.createNewFile();
+ } catch (IOException e) {
+ LOG.error("Unable to create session closed marker", e);
+ }
+ }
+ }
+
public SessionManager getSessionManager() {
return sessionManager;
}
@@ -368,10 +425,11 @@ public void close() throws HiveSQLException {
hiveHist.closeStream();
}
sessionState.close();
- release();
} catch (IOException ioe) {
- release();
throw new HiveSQLException("Failure to close", ioe);
+ } finally {
+ release();
+ closeSessionLog();
}
}
@@ -442,4 +500,13 @@ public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
protected HiveSession getSession() {
return this;
}
+
+ @Override
+ public boolean isLogRedirectionEnabled() {
+ return isLogRedirectionEnabled;
+ }
+
+ public File getSessionLogDir() {
+ return sessionLogDir;
+ }
}
diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index e262b72..b6bdf9e 100644
--- service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -18,14 +18,18 @@
package org.apache.hive.service.cli.session;
-import java.util.List;
-import java.util.Map;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -51,6 +55,9 @@
private final OperationManager operationManager = new OperationManager();
private ThreadPoolExecutor backgroundOperationPool;
+ private ScheduledExecutorService logPurgerService;
+ private File queryLogDir;
+ private boolean isLogRedirectionEnabled;
public SessionManager() {
super("SessionManager");
@@ -71,6 +78,31 @@ public synchronized void init(HiveConf hiveConf) {
backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize,
keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize));
backgroundOperationPool.allowCoreThreadTimeOut(true);
+
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOG_REDIRECTION_ENABLED)) {
+ queryLogDir = new File(hiveConf.getVar(ConfVars.HIVE_SERVER2_LOG_DIRECTORY));
+ isLogRedirectionEnabled = true;
+ if (queryLogDir.exists() && !queryLogDir.isDirectory()) {
+ LOG.warn("Query logs - not a directory! " + queryLogDir.getAbsolutePath());
+ isLogRedirectionEnabled = false;
+ }
+
+ if (!queryLogDir.exists()) {
+ if (!queryLogDir.mkdir()) {
+ LOG.warn("Query logs - unable to create query log directory - " + queryLogDir.getAbsolutePath());
+ isLogRedirectionEnabled = false;
+ }
+ }
+
+ if (isLogRedirectionEnabled) {
+ logPurgerService = Executors.newSingleThreadScheduledExecutor();
+ long purgeDelay = hiveConf.getLongVar(ConfVars.HIVE_SERVER2_LOG_PURGE_DELAY);
+ QueryLogPurger purger = new QueryLogPurger(queryLogDir, purgeDelay);
+ logPurgerService.scheduleWithFixedDelay(purger, 60, 60, TimeUnit.SECONDS);
+ LOG.info("Started log purger service");
+ }
+ }
+
addService(operationManager);
super.init(hiveConf);
}
@@ -93,6 +125,10 @@ public synchronized void stop() {
" seconds has been exceeded. RUNNING background operations will be shut down", e);
}
}
+
+ if (logPurgerService != null) {
+ logPurgerService.shutdownNow();
+ }
}
public SessionHandle openSession(String username, String password, Map sessionConf)
@@ -116,6 +152,9 @@ public SessionHandle openSession(String username, String password, Map= purgeDelay) {
+ Queue queue = new LinkedList();
+ queue.add(session);
+
+ while (queue.isEmpty()) {
+ File f = queue.poll();
+ if (lastModifiedTime < f.lastModified()) {
+ lastModifiedTime = f.lastModified();
+ }
+ if (f.isDirectory()) {
+ // Its an operation log dir
+ long operationLastLogTime = f.lastModified();
+ File[] logFiles = f.listFiles();
+ if (logFiles != null) {
+ for (File child : logFiles) {
+ if (operationLastLogTime < child.lastModified()) {
+ operationLastLogTime = child.lastModified();
+ }
+ queue.offer(child);
+ }
+ }
+ if (System.currentTimeMillis() - operationLastLogTime >= purgeDelay) {
+ // there hasn't been anything logged for this operation for a long time
+ // assume that this operation is done and delete its logs
+ deleteDirectory(f, System.currentTimeMillis() - operationLastLogTime, false);
+ }
+ }
+ }
+ }
+ inactiveSince = System.currentTimeMillis() - lastModifiedTime;
+ }
+
+ if (inactiveSince >= purgeDelay) {
+ deleteDirectory(session, inactiveSince, sessionClosed);
+ }
+ }
+ }
+ }
+ }
}
diff --git service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
index 83f2535..cd66fdf 100644
--- service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
+++ service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java
@@ -29,9 +29,13 @@
public class EmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService {
public EmbeddedThriftBinaryCLIService() {
+ this(null);
+ }
+
+ public EmbeddedThriftBinaryCLIService(HiveConf conf) {
super(new CLIService());
isEmbedded = true;
- cliService.init(new HiveConf());
+ cliService.init(conf == null ? new HiveConf() : conf);
cliService.start();
}
}
diff --git service/src/test/org/apache/hive/service/cli/CLIServiceTest.java service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index 44d3130..f1426b1 100644
--- service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -121,8 +121,9 @@ public void getInfoTest() throws Exception {
@Test
public void testExecuteStatement() throws Exception {
HashMap confOverlay = new HashMap();
+ confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LOG_REDIRECTION_ENABLED.varname, "true");
SessionHandle sessionHandle = client.openSession("tom", "password",
- new HashMap());
+ confOverlay);
assertNotNull(sessionHandle);
// Change lock manager, otherwise unit-test doesn't go through
diff --git service/src/test/org/apache/hive/service/cli/session/TestQueryLogPurger.java service/src/test/org/apache/hive/service/cli/session/TestQueryLogPurger.java
new file mode 100644
index 0000000..662eaa3
--- /dev/null
+++ service/src/test/org/apache/hive/service/cli/session/TestQueryLogPurger.java
@@ -0,0 +1,75 @@
+package org.apache.hive.service.cli.session;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Date;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestQueryLogPurger {
+ @Test
+ public void testQueryLogPurger() throws Exception {
+ // Create a session dir and few log files inside it
+ File queryLogDir = new File(FileUtils.getTempDirectory(), "query_log_dir");
+ try {
+ queryLogDir.mkdir();
+ final int PURGE_PERIOD = 2000;
+ SessionManager.QueryLogPurger purger = new SessionManager.QueryLogPurger(queryLogDir, PURGE_PERIOD);
+ System.out.println("Creating test files " + new Date());
+ // Create a session with closed marker
+ createSessionFiles(queryLogDir, 1, true);
+ // Another session without closed marker
+ createSessionFiles(queryLogDir, 2, false);
+
+ System.out.println("Done creating test files " + new Date());
+ long before = System.currentTimeMillis();
+ System.out.println("purger run 1 " + new Date());
+ purger.run();
+ if (System.currentTimeMillis() - before < PURGE_PERIOD) {
+ System.out.println("Should not delete in this run " + new Date());
+ // Expecting logs not to be deleted in this run
+ assertEquals(2, queryLogDir.listFiles().length);
+ }
+
+ Thread.sleep(PURGE_PERIOD);
+ System.out.println("purger run 2 " + new Date());
+ purger.run();
+ // Now they should be deleted since age > 150
+ assertEquals(0, queryLogDir.listFiles().length);
+ }
+ finally {
+ FileUtils.forceDelete(queryLogDir);
+ System.out.println("deleted test files");
+ }
+ }
+
+ private void createSessionFiles(File queryLogDir, int sessionId, boolean closeSession) throws Exception {
+ String session = "test_session" + sessionId;
+ String testOp = "op1";
+ File sessionDir = new File(queryLogDir, session);
+ sessionDir.mkdir();
+
+ File sessionOut = new File(sessionDir, "session.out");
+ sessionOut.createNewFile();
+ File sessionErr = new File(sessionDir, "session.err");
+ sessionErr.createNewFile();
+
+
+ File op1Dir = new File(sessionDir, testOp);
+ op1Dir.mkdir();
+
+ File op1Out = new File(op1Dir, testOp +".out");
+ op1Out.createNewFile();
+ File op1Err = new File(op1Dir, testOp + ".err");
+ op1Err.createNewFile();
+
+ if (closeSession) {
+ // Create session closed marker
+ File sessionClosedMarker = new File(sessionDir, HiveSession.SESSION_CLOSED_MARKER);
+ sessionClosedMarker.createNewFile();
+ }
+ System.out.println("Created session files " + sessionId);
+ }
+}