diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index fa3e048..b55177d 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -792,6 +792,11 @@ HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME("hive.server2.async.exec.keepalive.time", 10), + HIVE_SERVER2_LOG_REDIRECTION_ENABLED("hive.server2.log.redirection.enabled", true), + HIVE_SERVER2_LOG_DIRECTORY("hive.server2.query.log.dir", "query_logs"), + // By default, logs will be purged after a week + HIVE_SERVER2_LOG_PURGE_DELAY("hive.server2.log.purge.delay", 7 * 1440 * 60 * 1000L), + // HiveServer2 auth configuration HIVE_SERVER2_AUTHENTICATION("hive.server2.authentication", "NONE", new StringsValidator("NOSASL", "NONE", "LDAP", "KERBEROS", "CUSTOM")), diff --git conf/hive-default.xml.template conf/hive-default.xml.template index c61a0bb..f206c2b 100644 --- conf/hive-default.xml.template +++ conf/hive-default.xml.template @@ -2047,6 +2047,28 @@ + hive.server2.log.redirection.enabled + true + Set this flag to false to disable HiveServer2 session and and operation logs + + + + hive.server2.query.log.dir + query_logs + Top level directory where session and operation level logs are stored if log redirection is + enabled. + + + + + hive.server2.log.purge.delay + 604800000 + Duration in milliseconds after which logs of closed or abandoned sessions and operations will be purged. + Default is one week + + + + hive.plan.serialization.format kryo diff --git itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java index ebda296..b8d693a 100644 --- itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java +++ itests/hive-unit/src/test/java/org/apache/hive/service/cli/TestEmbeddedThriftBinaryCLIService.java @@ -18,12 +18,17 @@ package org.apache.hive.service.cli; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.cli.thrift.EmbeddedThriftBinaryCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIService; import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.util.UUID; /** * TestEmbeddedThriftBinaryCLIService. @@ -55,5 +60,4 @@ public void setUp() throws Exception { public void tearDown() throws Exception { super.tearDown(); } - } diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 62fc150..fbb1901 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -114,7 +114,7 @@ static final private String CLASS_NAME = Driver.class.getName(); static final private Log LOG = LogFactory.getLog(CLASS_NAME); - static final private LogHelper console = new LogHelper(LOG); + private LogHelper console = new LogHelper(LOG); private static final Object compileMonitor = new Object(); @@ -136,6 +136,8 @@ private int maxthreads; private static final int SLEEP_TIME = 2000; protected int tryCount = Integer.MAX_VALUE; + private DriverContext driverCxt; + private boolean logRedirecionEnabled; private String userName; @@ -1470,6 +1472,14 @@ public void launchTask(Task tsk, String queryId, boolean console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs); } tsk.initialize(conf, plan, cxt); + if (logRedirecionEnabled) { + tsk.setLogRedirectionEnabled(true); + tsk.getConsole().setInfoStream(console.getInfoStream()); + tsk.getConsole().setErrorStream(console.getErrStream()); + tsk.getConsole().setOutStream(console.getOutStream()); + tsk.getConsole().setChildErrStream(console.getChildErrStream()); + tsk.getConsole().setChildOutStream(console.getChildOutStream()); + } TaskResult tskRes = new TaskResult(); TaskRunner tskRun = new TaskRunner(tsk, tskRes); @@ -1663,4 +1673,15 @@ public String getErrorMsg() { return errorMessage; } + public void setConsole(LogHelper console) { + this.console = console; + } + + public boolean isLogRedirecionEnabled() { + return logRedirecionEnabled; + } + + public void setLogRedirecionEnabled(boolean isLogRedirecionEnabled) { + this.logRedirecionEnabled = isLogRedirecionEnabled; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 655395c..6678c69 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -67,6 +67,7 @@ protected int taskTag; private boolean isLocalMode =false; private boolean retryCmdWhenFail = false; + protected boolean isLogRedirectionEnabled; public static final int NO_TAG = 0; public static final int COMMON_JOIN = 1; @@ -86,6 +87,14 @@ protected String id; protected T work; + public void setConsole(LogHelper console) { + this.console = console; + } + + public LogHelper getConsole() { + return console; + } + public static enum FeedType { DYNAMIC_PARTITIONS, // list of dynamic partitions } @@ -132,7 +141,6 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverC throw new RuntimeException(e); } this.driverContext = driverContext; - console = new LogHelper(LOG); } @@ -510,5 +518,12 @@ void setException(Throwable ex) { public String toString() { return getId() + ":" + getType(); + } + public boolean isLogRedirectionEnabled() { + return isLogRedirectionEnabled; + } + + public void setLogRedirectionEnabled(boolean isLogRedirectionEnabled) { + this.isLogRedirectionEnabled = isLogRedirectionEnabled; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index a7e2253..1f1eb27 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -261,16 +261,16 @@ public int execute(DriverContext driverContext) { // Run ExecDriver in another JVM executor = Runtime.getRuntime().exec(cmdLine, env, new File(workDir)); - CachingPrintStream errPrintStream = - new CachingPrintStream(SessionState.getConsole().getChildErrStream()); - - StreamPrinter outPrinter = new StreamPrinter( - executor.getInputStream(), null, - SessionState.getConsole().getChildOutStream()); - StreamPrinter errPrinter = new StreamPrinter( - executor.getErrorStream(), null, - errPrintStream); - + StreamPrinter outPrinter = null, errPrinter = null; + CachingPrintStream errPrintStream = null; + if (isLogRedirectionEnabled) { + outPrinter = new StreamPrinter(executor.getInputStream(), null, getConsole().getChildOutStream()); + errPrintStream = new CachingPrintStream(getConsole().getChildErrStream()); + } else { + outPrinter = new StreamPrinter(executor.getInputStream(), null, SessionState.getConsole().getChildOutStream()); + errPrintStream = new CachingPrintStream(SessionState.getConsole().getChildErrStream()); + } + errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream); outPrinter.start(); errPrinter.start(); diff --git ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 0684aac..dd765fb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -362,6 +362,9 @@ private static String makeSessionId() { return UUID.randomUUID().toString(); } + public void setSessionConsole(LogHelper sessionConsole) { + this.sessionConsole = sessionConsole; + } /** * This class provides helper routines to emit informational and error * messages to the user and log4j files while obeying the current session's @@ -379,6 +382,11 @@ private static String makeSessionId() { protected Log LOG; protected boolean isSilent; + private PrintStream childErrStream; + private PrintStream childOutStream; + private PrintStream errorStream; + private PrintStream outStream; + private PrintStream infoStream; public LogHelper(Log LOG) { this(LOG, false); @@ -390,26 +398,42 @@ public LogHelper(Log LOG, boolean isSilent) { } public PrintStream getOutStream() { + if (outStream != null) { + return outStream; + } SessionState ss = SessionState.get(); return ((ss != null) && (ss.out != null)) ? ss.out : System.out; } public PrintStream getInfoStream() { + if (infoStream != null) { + return infoStream; + } SessionState ss = SessionState.get(); return ((ss != null) && (ss.info != null)) ? ss.info : getErrStream(); } public PrintStream getErrStream() { + if (errorStream != null) { + return errorStream; + } SessionState ss = SessionState.get(); return ((ss != null) && (ss.err != null)) ? ss.err : System.err; } public PrintStream getChildOutStream() { + if (childOutStream != null) { + return childOutStream; + } + SessionState ss = SessionState.get(); return ((ss != null) && (ss.childOut != null)) ? ss.childOut : System.out; } public PrintStream getChildErrStream() { + if (childErrStream != null) { + return childErrStream; + } SessionState ss = SessionState.get(); return ((ss != null) && (ss.childErr != null)) ? ss.childErr : System.err; } @@ -439,14 +463,36 @@ public void printError(String error, String detail) { getErrStream().println(error); LOG.error(error + StringUtils.defaultString(detail)); } + + public void setChildErrStream(PrintStream childErrStream) { + this.childErrStream = childErrStream; + } + public void setChildOutStream(PrintStream childOutStream) { + this.childOutStream = childOutStream; + } + public void setInfoStream(PrintStream stream) { + infoStream = stream; + } + public void setOutStream(PrintStream stream) { + outStream = stream; + } + public void setErrorStream(PrintStream stream) { + errorStream = stream; + } } private static LogHelper _console; + private LogHelper sessionConsole; /** * initialize or retrieve console object for SessionState. */ public static LogHelper getConsole() { + SessionState ss = tss.get(); + if (ss != null && ss.sessionConsole != null) { + return ss.sessionConsole; + } + if (_console == null) { Log LOG = LogFactory.getLog("SessionState"); _console = new LogHelper(LOG); diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java index 5d1dd5f..3fdc46f 100644 --- service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -18,10 +18,12 @@ package org.apache.hive.service.cli.operation; import java.util.EnumSet; +import java.io.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; @@ -41,6 +43,9 @@ public static final Log LOG = LogFactory.getLog(Operation.class.getName()); public static final long DEFAULT_FETCH_MAX_ROWS = 100; protected boolean hasResultSet; + protected SessionState.LogHelper console; + protected PrintStream opLogOut; + protected PrintStream opLogErr; protected static final EnumSet DEFAULT_FETCH_ORIENTATION_SET = EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST); @@ -49,6 +54,7 @@ protected Operation(HiveSession parentSession, OperationType opType) { super(); this.parentSession = parentSession; opHandle = new OperationHandle(opType); + console = new SessionState.LogHelper(LOG); } public void setConfiguration(HiveConf configuration) { @@ -153,4 +159,49 @@ protected void validateFetchOrientation(FetchOrientation orientation, " is not supported for this resultset", "HY106"); } } + + public void setConsole(SessionState.LogHelper log) { + console = log; + } + + public SessionState.LogHelper getConsole() { + return console; + } + + protected void openLogStreams() { + if (parentSession.isLogRedirectionEnabled()) { + File sessionLogDir = parentSession.getSessionLogDir(); + String operationId = opHandle.getHandleIdentifier().toString(); + try { + opLogOut = new PrintStream(new FileOutputStream(new File(sessionLogDir, operationId + ".out")), + true, "UTF-8"); + opLogErr = new PrintStream(new FileOutputStream(new File(sessionLogDir, operationId + ".err")), + true, "UTF-8" ); + console.setInfoStream(opLogOut); + console.setOutStream(opLogOut); + console.setErrorStream(opLogErr); + console.setChildErrStream(opLogErr); + console.setChildOutStream(opLogOut); + } catch (FileNotFoundException e) { + LOG.error("Error setting up log redirection for operation " + getHandle().toString(), e); + } catch (UnsupportedEncodingException e) { + LOG.error("Error setting up log redirection, unable to create print stream", e); + } + + LOG.info("Created session log files in dir " + parentSession.getSessionLogDir().getAbsolutePath() + + " operation " + operationId); + } + } + + protected void closeLogStreams() { + if (parentSession.isLogRedirectionEnabled()) { + if (opLogOut != null) { + opLogOut.close(); + } + + if (opLogErr != null) { + opLogErr.close(); + } + } + } } diff --git service/src/java/org/apache/hive/service/cli/operation/OperationManager.java service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index bcdb67f..a1f2fed 100644 --- service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -18,10 +18,13 @@ package org.apache.hive.service.cli.operation; +import java.io.File; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.AbstractService; import org.apache.hive.service.cli.FetchOrientation; @@ -49,7 +52,6 @@ public OperationManager() { @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; - super.init(hiveConf); } diff --git service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 296f8b3..741f624 100644 --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -74,6 +74,7 @@ public SQLOperation(HiveSession parentSession, String statement, Map opHandleSet = new HashSet(); + private boolean isLogRedirectionEnabled; + protected File queryLogDir; + private PrintStream sessionOut; + private PrintStream sessionErr; + private File sessionLogDir; public HiveSessionImpl(String username, String password, Map sessionConf) { this.username = username; @@ -97,6 +102,58 @@ public HiveSessionImpl(String username, String password, Map ses SessionState.start(sessionState); } + @Override + public void setupLogRedirection(File queryLogDir) throws HiveSQLException { + // Create session.out and .err directories + sessionLogDir = new File(queryLogDir, sessionHandle.getHandleIdentifier().toString()); + isLogRedirectionEnabled = true; + if (!sessionLogDir.exists()) { + if (!sessionLogDir.mkdir()) { + LOG.warn("Query logs - unable to create session dir " + sessionLogDir.getAbsolutePath()); + isLogRedirectionEnabled = false; + return; + } + } else { + LOG.warn("Session log directory already exsits " + sessionLogDir.getAbsolutePath()); + } + + try { + sessionOut = new PrintStream(new FileOutputStream(new File(sessionLogDir, "session.out")), true, "UTF-8"); + sessionErr = new PrintStream(new FileOutputStream(new File(sessionLogDir, "session.err")), true, "UTF-8"); + SessionState.LogHelper console = new SessionState.LogHelper(LOG); + console.setErrorStream(sessionErr); + console.setInfoStream(sessionOut); + console.setOutStream(sessionOut); + sessionState.setSessionConsole(console); + LOG.info("Created session log files in " + sessionLogDir.getAbsolutePath()); + } catch (FileNotFoundException e) { + isLogRedirectionEnabled = false; + LOG.error("Error creating session log file for " + sessionHandle.toString(), e); + } catch (UnsupportedEncodingException e) { + isLogRedirectionEnabled = false; + LOG.error("Error creating session log file for " + sessionHandle.toString(), e); + } + } + + private void closeSessionLog() { + if (isLogRedirectionEnabled) { + if (sessionOut != null) { + sessionOut.close(); + } + + if (sessionErr != null) { + sessionErr.close(); + } + // Mark session as closed in log dir + File sessionClosed = new File(sessionLogDir, SESSION_CLOSED_MARKER); + try { + sessionClosed.createNewFile(); + } catch (IOException e) { + LOG.error("Unable to create session closed marker", e); + } + } + } + public SessionManager getSessionManager() { return sessionManager; } @@ -368,10 +425,11 @@ public void close() throws HiveSQLException { hiveHist.closeStream(); } sessionState.close(); - release(); } catch (IOException ioe) { - release(); throw new HiveSQLException("Failure to close", ioe); + } finally { + release(); + closeSessionLog(); } } @@ -442,4 +500,13 @@ public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException { protected HiveSession getSession() { return this; } + + @Override + public boolean isLogRedirectionEnabled() { + return isLogRedirectionEnabled; + } + + public File getSessionLogDir() { + return sessionLogDir; + } } diff --git service/src/java/org/apache/hive/service/cli/session/SessionManager.java service/src/java/org/apache/hive/service/cli/session/SessionManager.java index e262b72..b6bdf9e 100644 --- service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -18,14 +18,18 @@ package org.apache.hive.service.cli.session; -import java.util.List; -import java.util.Map; +import java.io.File; +import java.io.IOException; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -51,6 +55,9 @@ private final OperationManager operationManager = new OperationManager(); private ThreadPoolExecutor backgroundOperationPool; + private ScheduledExecutorService logPurgerService; + private File queryLogDir; + private boolean isLogRedirectionEnabled; public SessionManager() { super("SessionManager"); @@ -71,6 +78,31 @@ public synchronized void init(HiveConf hiveConf) { backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue(backgroundPoolQueueSize)); backgroundOperationPool.allowCoreThreadTimeOut(true); + + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOG_REDIRECTION_ENABLED)) { + queryLogDir = new File(hiveConf.getVar(ConfVars.HIVE_SERVER2_LOG_DIRECTORY)); + isLogRedirectionEnabled = true; + if (queryLogDir.exists() && !queryLogDir.isDirectory()) { + LOG.warn("Query logs - not a directory! " + queryLogDir.getAbsolutePath()); + isLogRedirectionEnabled = false; + } + + if (!queryLogDir.exists()) { + if (!queryLogDir.mkdir()) { + LOG.warn("Query logs - unable to create query log directory - " + queryLogDir.getAbsolutePath()); + isLogRedirectionEnabled = false; + } + } + + if (isLogRedirectionEnabled) { + logPurgerService = Executors.newSingleThreadScheduledExecutor(); + long purgeDelay = hiveConf.getLongVar(ConfVars.HIVE_SERVER2_LOG_PURGE_DELAY); + QueryLogPurger purger = new QueryLogPurger(queryLogDir, purgeDelay); + logPurgerService.scheduleWithFixedDelay(purger, 60, 60, TimeUnit.SECONDS); + LOG.info("Started log purger service"); + } + } + addService(operationManager); super.init(hiveConf); } @@ -93,6 +125,10 @@ public synchronized void stop() { " seconds has been exceeded. RUNNING background operations will be shut down", e); } } + + if (logPurgerService != null) { + logPurgerService.shutdownNow(); + } } public SessionHandle openSession(String username, String password, Map sessionConf) @@ -116,6 +152,9 @@ public SessionHandle openSession(String username, String password, Map= purgeDelay) { + Queue queue = new LinkedList(); + queue.add(session); + + while (queue.isEmpty()) { + File f = queue.poll(); + if (lastModifiedTime < f.lastModified()) { + lastModifiedTime = f.lastModified(); + } + if (f.isDirectory()) { + // Its an operation log dir + long operationLastLogTime = f.lastModified(); + File[] logFiles = f.listFiles(); + if (logFiles != null) { + for (File child : logFiles) { + if (operationLastLogTime < child.lastModified()) { + operationLastLogTime = child.lastModified(); + } + queue.offer(child); + } + } + if (System.currentTimeMillis() - operationLastLogTime >= purgeDelay) { + // there hasn't been anything logged for this operation for a long time + // assume that this operation is done and delete its logs + deleteDirectory(f, System.currentTimeMillis() - operationLastLogTime, false); + } + } + } + } + inactiveSince = System.currentTimeMillis() - lastModifiedTime; + } + + if (inactiveSince >= purgeDelay) { + deleteDirectory(session, inactiveSince, sessionClosed); + } + } + } + } + } } diff --git service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java index 83f2535..cd66fdf 100644 --- service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java +++ service/src/java/org/apache/hive/service/cli/thrift/EmbeddedThriftBinaryCLIService.java @@ -29,9 +29,13 @@ public class EmbeddedThriftBinaryCLIService extends ThriftBinaryCLIService { public EmbeddedThriftBinaryCLIService() { + this(null); + } + + public EmbeddedThriftBinaryCLIService(HiveConf conf) { super(new CLIService()); isEmbedded = true; - cliService.init(new HiveConf()); + cliService.init(conf == null ? new HiveConf() : conf); cliService.start(); } } diff --git service/src/test/org/apache/hive/service/cli/CLIServiceTest.java service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index 44d3130..f1426b1 100644 --- service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -121,8 +121,9 @@ public void getInfoTest() throws Exception { @Test public void testExecuteStatement() throws Exception { HashMap confOverlay = new HashMap(); + confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LOG_REDIRECTION_ENABLED.varname, "true"); SessionHandle sessionHandle = client.openSession("tom", "password", - new HashMap()); + confOverlay); assertNotNull(sessionHandle); // Change lock manager, otherwise unit-test doesn't go through diff --git service/src/test/org/apache/hive/service/cli/session/TestQueryLogPurger.java service/src/test/org/apache/hive/service/cli/session/TestQueryLogPurger.java new file mode 100644 index 0000000..662eaa3 --- /dev/null +++ service/src/test/org/apache/hive/service/cli/session/TestQueryLogPurger.java @@ -0,0 +1,75 @@ +package org.apache.hive.service.cli.session; + +import org.apache.commons.io.FileUtils; +import org.junit.Test; + +import java.io.File; +import java.util.Date; + +import static org.junit.Assert.assertEquals; + +public class TestQueryLogPurger { + @Test + public void testQueryLogPurger() throws Exception { + // Create a session dir and few log files inside it + File queryLogDir = new File(FileUtils.getTempDirectory(), "query_log_dir"); + try { + queryLogDir.mkdir(); + final int PURGE_PERIOD = 2000; + SessionManager.QueryLogPurger purger = new SessionManager.QueryLogPurger(queryLogDir, PURGE_PERIOD); + System.out.println("Creating test files " + new Date()); + // Create a session with closed marker + createSessionFiles(queryLogDir, 1, true); + // Another session without closed marker + createSessionFiles(queryLogDir, 2, false); + + System.out.println("Done creating test files " + new Date()); + long before = System.currentTimeMillis(); + System.out.println("purger run 1 " + new Date()); + purger.run(); + if (System.currentTimeMillis() - before < PURGE_PERIOD) { + System.out.println("Should not delete in this run " + new Date()); + // Expecting logs not to be deleted in this run + assertEquals(2, queryLogDir.listFiles().length); + } + + Thread.sleep(PURGE_PERIOD); + System.out.println("purger run 2 " + new Date()); + purger.run(); + // Now they should be deleted since age > 150 + assertEquals(0, queryLogDir.listFiles().length); + } + finally { + FileUtils.forceDelete(queryLogDir); + System.out.println("deleted test files"); + } + } + + private void createSessionFiles(File queryLogDir, int sessionId, boolean closeSession) throws Exception { + String session = "test_session" + sessionId; + String testOp = "op1"; + File sessionDir = new File(queryLogDir, session); + sessionDir.mkdir(); + + File sessionOut = new File(sessionDir, "session.out"); + sessionOut.createNewFile(); + File sessionErr = new File(sessionDir, "session.err"); + sessionErr.createNewFile(); + + + File op1Dir = new File(sessionDir, testOp); + op1Dir.mkdir(); + + File op1Out = new File(op1Dir, testOp +".out"); + op1Out.createNewFile(); + File op1Err = new File(op1Dir, testOp + ".err"); + op1Err.createNewFile(); + + if (closeSession) { + // Create session closed marker + File sessionClosedMarker = new File(sessionDir, HiveSession.SESSION_CLOSED_MARKER); + sessionClosedMarker.createNewFile(); + } + System.out.println("Created session files " + sessionId); + } +}