Index: repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala (revision 8c7f03b4a8c5fae73c14bac7d97c62cc0fd6241d) +++ repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala (revision 6d2867261a75d30a44101b29d348d5957ecd8934) @@ -20,6 +20,7 @@ import scala.concurrent.Await import scala.concurrent.duration.Duration +import com.google.common.base.Joiner import io.netty.channel.ChannelHandlerContext import org.apache.spark.SparkConf @@ -66,6 +67,18 @@ session.complete(EOLUtils.convertToSystemEOL(msg.code), msg.codeType, msg.cursor) } + def handle(ctx: ChannelHandlerContext, msg: BaseProtocol.GetJobLog): ReplJobResults = { + val statements = session.statements.get(msg.id.toInt) + val output = session.readLog(msg.id.toInt, msg.size) + .map( x => s""""${Joiner.on("\n").join(x.iterator())} """" ) + .getOrElse("") + val resultStatements = statements.map{ item => + val newStat = new Statement(item.id, item.code, item.state.get(), output) + newStat.updateProgress(item.progress) + newStat + }.map(Array(_)).getOrElse(Array.empty[Statement]) + new ReplJobResults(resultStatements) + } /** * Return statement results. Results are sorted by statement id. */ Index: repl/src/main/scala/org/apache/livy/repl/Session.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- repl/src/main/scala/org/apache/livy/repl/Session.scala (revision 8c7f03b4a8c5fae73c14bac7d97c62cc0fd6241d) +++ repl/src/main/scala/org/apache/livy/repl/Session.scala (revision 6d2867261a75d30a44101b29d348d5957ecd8934) @@ -17,6 +17,7 @@ package org.apache.livy.repl +import java.util import java.util.{LinkedHashMap => JLinkedHashMap} import java.util.Map.Entry import java.util.concurrent.Executors @@ -28,6 +29,8 @@ import scala.concurrent.duration._ import scala.util.control.NonFatal +import org.apache.hadoop.hive.ql.session.OperationLog +import org.apache.log4j.Logger import org.apache.spark.{SparkConf, SparkContext} import org.json4s.jackson.JsonMethods.{compact, render} import org.json4s.DefaultFormats @@ -36,6 +39,7 @@ import org.apache.livy.Logging import org.apache.livy.rsc.RSCConf import org.apache.livy.rsc.driver.{SparkEntries, Statement, StatementState} +import org.apache.livy.rsc.operation.{LogDivertAppender, LogManager} import org.apache.livy.sessions._ object Session { @@ -67,12 +71,18 @@ private var _state: SessionState = SessionState.NotStarted + private val logManager = new LogManager(livyConf) + // Number of statements kept in driver's memory private val numRetainedStatements = livyConf.getInt(RSCConf.Entry.RETAINED_STATEMENTS) private val _statements = new JLinkedHashMap[Int, Statement] { protected override def removeEldestEntry(eldest: Entry[Int, Statement]): Boolean = { - size() > numRetainedStatements + val isRemove = size() > numRetainedStatements + if(isRemove){ + statmentRemoveCallBack(eldest.getKey) + } + isRemove } }.asScala @@ -132,7 +142,9 @@ interpGroup.synchronized { interpGroup.put(Spark, sparkInterp) } - + val ap = new LogDivertAppender(logManager, + OperationLog.getLoggingLevel(livyConf.get(RSCConf.Entry.LOGGING_OPERATION_LEVEL))) + Logger.getRootLogger.addAppender(ap) changeState(SessionState.Idle) entries }(interpreterExecutor) @@ -147,6 +159,10 @@ _statements.toMap } + def readLog(statementId: Int, maxRows : Long ): Option[util.List[String]] = { + Option(logManager.readLog(statementId.toString, maxRows)) + } + def execute(code: String, codeType: String = null): Int = { val tpe = if (codeType != null) { Kind(codeType) @@ -227,6 +243,7 @@ interpreterExecutor.shutdown() cancelExecutor.shutdown() interpGroup.values.foreach(_.close()) + logManager.shutdown() } /** @@ -268,9 +285,9 @@ changeState(SessionState.Idle) } } - val resultInJson = interp.map { i => try { + logManager.registerOperationLog( executionCount.toString ) i.execute(code) match { case Interpreter.ExecuteSuccess(data) => transitToIdle() @@ -317,6 +334,8 @@ (ENAME -> f"Internal Error: ${e.getClass.getName}") ~ (EVALUE -> e.getMessage) ~ (TRACEBACK -> Seq.empty[String]) + } finally { + logManager.removeCurrentOperationLog() } }.getOrElse { transitToIdle() @@ -356,4 +375,8 @@ private def statementIdToJobGroup(statementId: Int): String = { statementId.toString } + + def statmentRemoveCallBack(statementId: Int): Unit = { + logManager.removeLog(statementId.toString) + } } Index: rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java (revision 8c7f03b4a8c5fae73c14bac7d97c62cc0fd6241d) +++ rsc/src/main/java/org/apache/livy/rsc/BaseProtocol.java (revision bf07e1d60a5a1ff0a50a3faf7fca52fcd0322e36) @@ -202,6 +202,24 @@ } } + public static class GetJobLog { + public boolean allResults; + public final String id; + public Long size; + + public GetJobLog(String id, Long size) { + this.allResults = false; + this.id= id ; + this.size = size; + } + + public GetJobLog(String id) { + this.allResults = true; + this.id=id ; + size = null; + } + } + public static class ReplCompleteRequest { public final String code; public final String codeType; Index: rsc/src/main/java/org/apache/livy/rsc/RSCClient.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- rsc/src/main/java/org/apache/livy/rsc/RSCClient.java (revision 8c7f03b4a8c5fae73c14bac7d97c62cc0fd6241d) +++ rsc/src/main/java/org/apache/livy/rsc/RSCClient.java (revision bf07e1d60a5a1ff0a50a3faf7fca52fcd0322e36) @@ -302,6 +302,10 @@ return deferredCall(new BaseProtocol.GetReplJobResults(), ReplJobResults.class); } + public Future getJobLog(String id, Long size) throws Exception { + return deferredCall(new BaseProtocol.GetJobLog(id, size), ReplJobResults.class); + } + public Future completeReplCode(String code, String codeType, int cursor) throws Exception { return deferredCall(new BaseProtocol.ReplCompleteRequest(code, codeType, cursor), Index: rsc/src/main/java/org/apache/livy/rsc/RSCConf.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- rsc/src/main/java/org/apache/livy/rsc/RSCConf.java (revision 8c7f03b4a8c5fae73c14bac7d97c62cc0fd6241d) +++ rsc/src/main/java/org/apache/livy/rsc/RSCConf.java (revision bf07e1d60a5a1ff0a50a3faf7fca52fcd0322e36) @@ -43,6 +43,7 @@ CLIENT_IN_PROCESS("client.do-not-use.run-driver-in-process", false), CLIENT_SHUTDOWN_TIMEOUT("client.shutdown-timeout", "10s"), DRIVER_CLASS("driver-class", null), + SESSION_ID("session.id", null), SESSION_KIND("session.kind", null), LIVY_JARS("jars", null), @@ -80,6 +81,11 @@ RETAINED_STATEMENTS("retained-statements", 100), RETAINED_SHARE_VARIABLES("retained.share-variables", 100), + //operation log + LOGGING_OPERATION_ENABLED("logging.operation.enabled" , false), + LOGGING_OPERATION_LEVEL("logging.operation.level" , "verbose"), + LOGGING_OPERATION_LOG_LOCATION("logging.operation.log.location" , null), + // Number of result rows to get for SQL Interpreters. SQL_NUM_ROWS("sql.num-rows", 1000); @@ -189,6 +195,7 @@ JOB_CANCEL_TIMEOUT("job_cancel.timeout", "0.4"), RETAINED_STATEMENTS("retained_statements", "0.4"); + private final String key; private final String version; private final String deprecationMessage; Index: rsc/src/main/java/org/apache/livy/rsc/operation/LogDivertAppender.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- rsc/src/main/java/org/apache/livy/rsc/operation/LogDivertAppender.java (revision 6d2867261a75d30a44101b29d348d5957ecd8934) +++ rsc/src/main/java/org/apache/livy/rsc/operation/LogDivertAppender.java (revision 6d2867261a75d30a44101b29d348d5957ecd8934) @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc.operation; + +import java.io.CharArrayWriter; +import java.util.Enumeration; +import java.util.regex.Pattern; + +import com.google.common.base.Joiner; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel; +import org.apache.log4j.*; +import org.apache.log4j.spi.Filter; +import org.apache.log4j.spi.LoggingEvent; + +/** + * An Appender to divert logs from individual threads to the LogObject they belong to. + */ +public class LogDivertAppender extends WriterAppender { + private static final Logger LOG = Logger.getLogger(LogDivertAppender.class.getName()); + private final LogManager operationManager; + public static final Layout defaultVerboseLayout = new PatternLayout( + "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n"); + public static final Layout defaultNonVerboseLayout = new PatternLayout( + "%-5p : %m%n"); + private boolean isVerbose; + private Layout verboseLayout; + + /** + * A log filter that filters messages coming from the logger with the given names. + * It be used as a white list filter or a black list filter. + * We apply black list filter on the Loggers used by the log diversion stuff, so that + * they don't generate more logs for themselves when they process logs. + * White list filter is used for less verbose log collection + */ + private static class NameFilter extends Filter { + private Pattern namePattern; + private LoggingLevel loggingMode; + private LogManager operationManager; + + /* Patterns that are excluded in verbose logging level. + * Filter out messages coming from log processing classes, or we'll run an infinite loop. + */ + private static final Pattern verboseExcludeNamePattern = Pattern.compile(Joiner.on("|") + .join(new String[] {LOG.getName(), OperationLog.class.getName(), + LogManager.class.getName()})); + + /* Patterns that are included in execution logging level. + * In execution mode, show only select logger messages. + */ + private static final Pattern executionIncludeNamePattern = Pattern.compile(Joiner.on("|") + .join(new String[] {"org.apache.hadoop.mapreduce.JobSubmitter", + "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(), + "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"})); + + /* Patterns that are included in performance logging level. + * In performance mode, show execution and performance logger messages. + */ + private static final Pattern performanceIncludeNamePattern = Pattern.compile( + executionIncludeNamePattern.pattern() + "|" + PerfLogger.class.getName()); + + private void setCurrentNamePattern(LoggingLevel mode) { + if (mode == LoggingLevel.VERBOSE) { + this.namePattern = verboseExcludeNamePattern; + } else if (mode == LoggingLevel.EXECUTION) { + this.namePattern = executionIncludeNamePattern; + } else if (mode == LoggingLevel.PERFORMANCE) { + this.namePattern = performanceIncludeNamePattern; + } + } + + NameFilter( + LoggingLevel loggingMode, LogManager op) { + this.operationManager = op; + this.loggingMode = loggingMode; + setCurrentNamePattern(loggingMode); + } + + @Override + public int decide(LoggingEvent ev) { + OperationLog log = LogManager.getOperationLogByThread(); + boolean excludeMatches = (loggingMode == LoggingLevel.VERBOSE); + + if (log == null) { + return Filter.DENY; + } + + LoggingLevel currentLoggingMode = log.getOpLoggingLevel(); + // If logging is disabled, deny everything. + if (currentLoggingMode == LoggingLevel.NONE) { + return Filter.DENY; + } + // Look at the current session's setting + // and set the pattern and excludeMatches accordingly. + if (currentLoggingMode != loggingMode) { + loggingMode = currentLoggingMode; + setCurrentNamePattern(loggingMode); + } + + boolean isMatch = namePattern.matcher(ev.getLoggerName()).matches(); + + if (excludeMatches == isMatch) { + // Deny if this is black-list filter (excludeMatches = true) and it + // matched + // or if this is whitelist filter and it didn't match + return Filter.DENY; + } + return Filter.NEUTRAL; + } + } + + /** This is where the log message will go to */ + private final CharArrayWriter writer = new CharArrayWriter(); + + private void setLayout(boolean isVerbose, Layout lo) { + if (isVerbose) { + if (lo == null) { + lo = defaultVerboseLayout; + LOG.info("Cannot find a Layout from a ConsoleAppender. Using default Layout pattern."); + } + } else { + lo = defaultNonVerboseLayout; + } + setLayout(lo); + } + + private void initLayout(boolean isVerbose) { + // There should be a ConsoleAppender. Copy its Layout. + Logger root = Logger.getRootLogger(); + Layout layout = null; + + Enumeration appenders = root.getAllAppenders(); + while (appenders.hasMoreElements()) { + Appender ap = (Appender) appenders.nextElement(); + if (ap.getClass().equals(ConsoleAppender.class)) { + layout = ap.getLayout(); + break; + } + } + setLayout(isVerbose, layout); + } + + public LogDivertAppender(LogManager operationManager, + LoggingLevel loggingMode) { + isVerbose = (loggingMode == LoggingLevel.VERBOSE); + initLayout(isVerbose); + setWriter(writer); + setName("LogDivertAppender"); + this.operationManager = operationManager; + this.verboseLayout = isVerbose ? layout : defaultVerboseLayout; + addFilter(new NameFilter(loggingMode, operationManager)); + } + + @Override + public void doAppend(LoggingEvent event) { + OperationLog log = LogManager.getOperationLogByThread(); + + // Set current layout depending on the verbose/non-verbose mode. + if (log != null) { + boolean isCurrModeVerbose = (log.getOpLoggingLevel() == LoggingLevel.VERBOSE); + + // If there is a logging level change from verbose->non-verbose or vice-versa since + // the last subAppend call, change the layout to preserve consistency. + if (isCurrModeVerbose != isVerbose) { + isVerbose = isCurrModeVerbose; + setLayout(isVerbose, verboseLayout); + } + } + super.doAppend(event); + } + + /** + * Overrides WriterAppender.subAppend(), which does the real logging. No need + * to worry about concurrency since log4j calls this synchronously. + */ + @Override + protected void subAppend(LoggingEvent event) { + super.subAppend(event); + // That should've gone into our writer. Notify the LogContext. + String logOutput = writer.toString(); + writer.reset(); + + OperationLog log = LogManager.getOperationLogByThread(); + if (log == null) { + LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName()); + return; + } + log.writeOperationLog(logOutput); + } +} Index: rsc/src/main/java/org/apache/livy/rsc/operation/LogManager.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- rsc/src/main/java/org/apache/livy/rsc/operation/LogManager.java (revision 6d2867261a75d30a44101b29d348d5957ecd8934) +++ rsc/src/main/java/org/apache/livy/rsc/operation/LogManager.java (revision 6d2867261a75d30a44101b29d348d5957ecd8934) @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.rsc.operation; + +import java.io.File; +import java.io.FileNotFoundException; +import java.sql.SQLException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.OperationLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.livy.rsc.RSCConf; + +/** + * LogManager + */ +public class LogManager { + + private static final Logger LOG = LoggerFactory.getLogger(LogManager.class); + + public static final String prefix = "livy-"; + + private String rootDirPath = null; + + private boolean isOperationLogEnabled = false; + + private HiveConf hiveConf; + + private Map logs = Collections.synchronizedMap(new LinkedHashMap()); + + public List readLog(String statementId, Long maxRow) throws SQLException { + OperationLog tl = this.getLog(statementId) ; + if(tl!=null){ + return tl.readOperationLog(false, maxRow) ; + } + return null; + } + + public void removeLog(String statementId){ + OperationLog operationLog=logs.get(statementId); + if(operationLog!=null){ + operationLog.close(); + } + logs.remove(statementId); + } + + public OperationLog getLog(String statementId) { + return logs.get(statementId); + } + + public LogManager(RSCConf rscConf) { + initialize(rscConf); + } + + private void initialize(RSCConf rscConf) { + String operationLogLocation = rscConf.get(RSCConf.Entry.LOGGING_OPERATION_LOG_LOCATION); + this.rootDirPath = operationLogLocation + File.separator + + prefix + rscConf.get(RSCConf.Entry.SESSION_ID); + isOperationLogEnabled = rscConf.getBoolean(RSCConf.Entry.LOGGING_OPERATION_ENABLED) + && new File(operationLogLocation).isDirectory(); + String logLevel = rscConf.get(RSCConf.Entry.LOGGING_OPERATION_LEVEL); + if(isOperationLogEnabled){ + hiveConf = new HiveConf(); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED, + isOperationLogEnabled); + hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION.varname, + operationLogLocation); + hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL.varname, + logLevel); + File rootDir = new File(this.rootDirPath); + if (!rootDir.exists()) { + rootDir.mkdir(); + } + } + } + + public void removeCurrentOperationLog() { + OperationLog.removeCurrentOperationLog(); + } + + public void registerOperationLog(String statementId) { + if (isOperationLogEnabled) { + OperationLog operationLog = logs.get(statementId); + if (operationLog == null) { + synchronized (this) { + File operationLogFile = new File( + rootDirPath, + statementId); + try { + if (operationLogFile.exists()) { + LOG.warn("The operation log file should not exist," + + " but it is already there: {}" , + operationLogFile.getAbsolutePath()); + operationLogFile.delete(); + } + if (!operationLogFile.createNewFile()) { + if (!operationLogFile.canRead() || !operationLogFile.canWrite()) { + LOG.warn( + "The operation log file can't be recreated," + + "or it cannot be read or written: {}", + operationLogFile.getAbsolutePath()); + return; + } + } + } catch (Exception e) { + LOG.warn( + String.format("Unable to create operation log file: %s" , + operationLogFile.getAbsolutePath()), e); + return; + } + try { + operationLog = new OperationLog(statementId, operationLogFile, hiveConf); + logs.put(statementId, operationLog); + } catch (FileNotFoundException e) { + LOG.warn(String.format( + "Unable to instantiate OperationLog object for operation: %s " , + statementId), e); + return; + } + } + } + if(operationLog!=null) { + // register this operationLog to current thread + OperationLog.setCurrentOperationLog(operationLog); + } + } + } + + public void shutdown() { + if (isOperationLogEnabled) { + logs.forEach((k, v) -> v.close()); + File rootDirFile = new File(rootDirPath); + if (rootDirFile.isDirectory()) { + rootDirFile.deleteOnExit(); + } + } + } + + public static OperationLog getOperationLogByThread() { + return OperationLog.getCurrentOperationLog(); + } + +} Index: server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala (revision 8c7f03b4a8c5fae73c14bac7d97c62cc0fd6241d) +++ server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala (revision 6d2867261a75d30a44101b29d348d5957ecd8934) @@ -104,6 +104,7 @@ info(s"Creating Interactive session $id: [owner: $owner, request: $request]") val builder = new LivyClientBuilder() .setAll(builderProperties.asJava) + .setConf(RSCConf.Entry.SESSION_ID.key(), id.toString) .setConf("livy.client.session-id", id.toString) .setConf(RSCConf.Entry.DRIVER_CLASS.key(), "org.apache.livy.repl.ReplDriver") .setConf(RSCConf.Entry.PROXY_USER.key(), impersonatedUser.orNull) @@ -493,6 +494,16 @@ Option(r.statements(0)) } } + + def getStatementLog(stmtId: String, size : Long): Option[Statement] = { + ensureActive() + val r = client.get.getJobLog(stmtId, size).get() + if (r.statements.length < 1) { + None + } else { + Option(r.statements(0)) + } + } def interrupt(): Future[Unit] = { stop() Index: server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala (revision 8c7f03b4a8c5fae73c14bac7d97c62cc0fd6241d) +++ server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala (revision 6d2867261a75d30a44101b29d348d5957ecd8934) @@ -147,6 +147,17 @@ Ok(Map("msg" -> "canceled")) } } + + get("/:id/statements/:statementId/log") { + withViewAccessSession { session => + val statementId = params("statementId") + val size = params.get("size").map(_.toInt).getOrElse(50) + Map( + "statements" -> session.getStatementLog(statementId, size) + ) + } + } + // This endpoint is used by the client-http module to "connect" to an existing session and // update its last activity time. It performs authorization checks to make sure the caller // has access to the session, so even though it returns the same data, it behaves differently Index: repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala (date 1554898661000) +++ repl/src/test/scala/org/apache/livy/repl/BaseSessionSpec.scala (date 1555129871000) @@ -39,7 +39,8 @@ implicit val formats = DefaultFormats - private val rscConf = new RSCConf(new Properties()).set(RSCConf.Entry.SESSION_KIND, kind.toString) + private val defaultRscConf = new RSCConf(new Properties()) + .set(RSCConf.Entry.SESSION_KIND, kind.toString) private val sparkConf = new SparkConf() @@ -53,6 +54,11 @@ } protected def withSession(testCode: Session => Any): Unit = { + withConfSession(testCode)() + } + + protected def withConfSession(testCode: Session => Any) + (rscConf: RSCConf = defaultRscConf): Unit = { val stateChangedCalled = new AtomicInteger() val session = new Session(rscConf, sparkConf, None, { _ => stateChangedCalled.incrementAndGet() }) @@ -70,7 +76,7 @@ } it should "start in the starting or idle state" in { - val session = new Session(rscConf, sparkConf) + val session = new Session(defaultRscConf, sparkConf) val future = session.start() try { Await.ready(future, 60 seconds) Index: repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala (date 1554898661000) +++ repl/src/test/scala/org/apache/livy/repl/SparkSessionSpec.scala (date 1555129871000) @@ -17,6 +17,9 @@ package org.apache.livy.repl +import java.io.File +import java.util.Properties + import scala.concurrent.duration._ import scala.language.postfixOps @@ -26,10 +29,22 @@ import org.scalatest.concurrent.Eventually._ import org.apache.livy.rsc.driver.StatementState +import org.apache.livy.rsc.RSCConf import org.apache.livy.sessions._ class SparkSessionSpec extends BaseSessionSpec(Spark) { + private def rscConfWithLog = { + val tmpLogFile = new File(s"target${File.separator}operation-log-tmp-test") + tmpLogFile.deleteOnExit() + tmpLogFile.mkdirs() + val rscConf = new RSCConf(new Properties()) + rscConf.set(RSCConf.Entry.SESSION_KIND, Spark.toString) + rscConf.set(RSCConf.Entry.LOGGING_OPERATION_ENABLED, true) + rscConf.set(RSCConf.Entry.LOGGING_OPERATION_LOG_LOCATION, tmpLogFile.getAbsolutePath) + rscConf.set(RSCConf.Entry.SESSION_ID, "session_id") + } + it should "execute `1 + 2` == 3" in withSession { session => val statement = execute(session)("1 + 2") statement.id should equal (0) @@ -267,4 +282,17 @@ session.progressOfStatement(stmtId) should be(1.0) } } + + it should "contains log" in withConfSession{ session => + val executeCode = + """ + |sc.parallelize(1 to 2, 2).map(i => (i, 1)).collect() + """.stripMargin + + val stmtId = session.execute(executeCode) + eventually(timeout(30 seconds), interval(100 millis)) { + val log = session.readLog(stmtId, 2000L) + log.mkString should include ("DAGScheduler") + } + }(rscConfWithLog) } Index: conf/livy-client.conf.template IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- conf/livy-client.conf.template (date 1555129871000) +++ conf/livy-client.conf.template (date 1555129963000) @@ -102,3 +102,11 @@ # Number of statements kept in driver's memory # livy.rsc.retained-statements = 100 + +# When true, rsc will save operation logs and make them available for clients +# livy.rsc.logging.operation.enabled=false +# Top level directory where operation logs are stored if logging functionality is enabled +# livy.rsc.logging.operation.log.location= +# The logging level for the statment. Possible values: EXECUTION, PERFORMANCE, VERBOSE, NONE +# livy.rsc.logging.operation.level=VERBOSE +