diff --git a/common/src/java/org/apache/hadoop/hive/common/LogUtils.java b/common/src/java/org/apache/hadoop/hive/common/LogUtils.java index 01b2e7c..5fb0a01 100644 --- a/common/src/java/org/apache/hadoop/hive/common/LogUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/LogUtils.java @@ -24,9 +24,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.log4j.MDC; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.config.Configurator; import org.apache.logging.log4j.core.impl.Log4jContextFactory; +import org.apache.logging.log4j.spi.DefaultThreadContextMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +49,11 @@ private static final String KEY_TO_MASK_WITH = "password"; private static final String MASKED_VALUE = "###_MASKED_###"; + // Constants of the key strings for the log4j ThreadContext. + public static final String SESSIONID_LOG_KEY = "sessionId"; + public static final String QUERYID_LOG_KEY = "queryId"; + public static final String OPERATIONLOG_LEVEL_KEY = "operationLogLevel"; + @SuppressWarnings("serial") public static class LogInitializationException extends Exception { public LogInitializationException(String msg) { @@ -109,6 +116,8 @@ public static String initHiveLog4jCommon(HiveConf conf, ConfVars confVarName) System.setProperty(HiveConf.ConfVars.HIVEQUERYID.toString(), queryId); } final boolean async = checkAndSetAsyncLogging(conf); + // required for MDC based routing appender so that child threads can inherit the MDC context + System.setProperty(DefaultThreadContextMap.INHERITABLE_MAP, "true"); Configurator.initialize(null, log4jFileName); logConfigLocation(conf); return "Logging initialized using configuration in " + log4jConfigFile + " Async: " + async; @@ -151,6 +160,7 @@ private static String initHiveLog4jDefault( } if (hive_l4j != null) { final boolean async = checkAndSetAsyncLogging(conf); + System.setProperty(DefaultThreadContextMap.INHERITABLE_MAP, "true"); Configurator.initialize(null, hive_l4j.toString()); logConfigLocation(conf); return (logMessage + "\n" + "Logging initialized using configuration in " + hive_l4j + @@ -192,4 +202,12 @@ public static String maskIfPassword(String key, String value) { } return value; } + + public static void registerLoggingContext(Configuration conf) { + MDC.put(SESSIONID_LOG_KEY, HiveConf.getVar(conf, HiveConf.ConfVars.HIVESESSIONID)); + MDC.put(QUERYID_LOG_KEY, HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID)); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { + MDC.put(OPERATIONLOG_LEVEL_KEY, HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL)); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index d981119..12a9e31 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1373,11 +1373,6 @@ private ReentrantLock tryAcquireCompileLock(boolean isParallelEnabled, LOG.debug("Waiting to acquire compile lock: " + command); } - OperationLog ol = OperationLog.getCurrentOperationLog(); - if (ol != null) { - ol.writeOperationLog(LoggingLevel.EXECUTION, "Waiting to acquire compile lock.\n"); - } - if (maxCompileLockWaitTime > 0) { try { if(!compileLock.tryLock(maxCompileLockWaitTime, TimeUnit.SECONDS)) { @@ -1397,9 +1392,6 @@ private ReentrantLock tryAcquireCompileLock(boolean isParallelEnabled, } LOG.debug(lockAcquiredMsg); - if (ol != null) { - ol.writeOperationLog(LoggingLevel.EXECUTION, lockAcquiredMsg + "\n"); - } return compileLock; } @@ -2092,13 +2084,6 @@ private void logMrWarning(int mrJobs) { } String warning = HiveConf.generateMrDeprecationWarning(); LOG.warn(warning); - warning = "WARNING: " + warning; - console.printInfo(warning); - // Propagate warning to beeline via operation log. - OperationLog ol = OperationLog.getCurrentOperationLog(); - if (ol != null) { - ol.writeOperationLog(LoggingLevel.EXECUTION, warning + "\n"); - } } private void setErrorMsgAndDetail(int exitVal, Throwable downstreamError, Task tsk) { @@ -2173,7 +2158,6 @@ private TaskRunner launchTask(Task tsk, String queryId, if (LOG.isInfoEnabled()){ LOG.info("Starting task [" + tsk + "] in parallel"); } - tskRun.setOperationLog(OperationLog.getCurrentOperationLog()); tskRun.start(); } else { if (LOG.isInfoEnabled()){ diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java index a596e92..eddc31e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.session.OperationLog; import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +34,6 @@ protected Task tsk; protected TaskResult result; protected SessionState ss; - private OperationLog operationLog; private static AtomicLong taskCounter = new AtomicLong(0); private static ThreadLocal taskRunnerID = new ThreadLocal() { @Override @@ -74,7 +72,6 @@ public boolean isRunning() { public void run() { runner = Thread.currentThread(); try { - OperationLog.setCurrentOperationLog(operationLog); SessionState.start(ss); runSequential(); } finally { @@ -113,8 +110,4 @@ public void runSequential() { public static long getTaskRunnerID () { return taskRunnerID.get(); } - - public void setOperationLog(OperationLog operationLog) { - this.operationLog = operationLog; - } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 1945163..a5c0fcd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; import org.apache.hadoop.hive.ql.io.IOPrepareCache; +import org.apache.hadoop.hive.ql.log.LogDivertAppender; import org.apache.hadoop.hive.ql.log.NullAppender; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FetchWork; @@ -632,6 +633,7 @@ private static void printUsage() { private static void setupChildLog4j(Configuration conf) { try { LogUtils.initHiveExecLog4j(); + LogDivertAppender.registerRoutingAppender(conf); } catch (LogInitializationException e) { System.err.println(e.getMessage()); } @@ -703,6 +705,8 @@ public static void main(String[] args) throws IOException, HiveException { } System.setProperty(HiveConf.ConfVars.HIVEQUERYID.toString(), queryId); + LogUtils.registerLoggingContext(conf); + if (noLog) { // If started from main(), and noLog is on, we should not output // any logs. To turn the log on, please set -Dtest.silent=false diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 591ea97..595d1bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -327,18 +327,8 @@ public int executeInChildVM(DriverContext driverContext) { CachingPrintStream errPrintStream = new CachingPrintStream(System.err); - StreamPrinter outPrinter; - StreamPrinter errPrinter; - OperationLog operationLog = OperationLog.getCurrentOperationLog(); - if (operationLog != null) { - outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out, - operationLog.getPrintStream()); - errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream, - operationLog.getPrintStream()); - } else { - outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out); - errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream); - } + StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out); + StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream); outPrinter.start(); errPrinter.start(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java new file mode 100644 index 0000000..64ce100 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.log; + +import java.util.regex.Pattern; + +import org.apache.hadoop.hive.common.LogUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.RandomAccessFileAppender; +import org.apache.logging.log4j.core.appender.routing.Route; +import org.apache.logging.log4j.core.appender.routing.Routes; +import org.apache.logging.log4j.core.appender.routing.RoutingAppender; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.LoggerConfig; +import org.apache.logging.log4j.core.config.Node; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.config.plugins.processor.PluginEntry; +import org.apache.logging.log4j.core.config.plugins.util.PluginType; +import org.apache.logging.log4j.core.filter.AbstractFilter; +import org.apache.logging.log4j.core.layout.PatternLayout; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; + +/** + * Divert appender to redirect operation logs to separate files. + */ +public class LogDivertAppender { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(LogDivertAppender.class.getName()); + public static final String verboseLayout = "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n"; + public static final String nonVerboseLayout = "%-5p : %m%n"; + + /** + * A log filter that filters messages coming from the logger with the given names. + * It be used as a white list filter or a black list filter. + * We apply black list filter on the Loggers used by the log diversion stuff, so that + * they don't generate more logs for themselves when they process logs. + * White list filter is used for less verbose log collection + */ + @Plugin(name = "NameFilter", category = "Core", elementType="filter", printObject = true) + private static class NameFilter extends AbstractFilter { + private Pattern namePattern; + private OperationLog.LoggingLevel loggingMode; + + /* Patterns that are excluded in verbose logging level. + * Filter out messages coming from log processing classes, or we'll run an infinite loop. + */ + private static final Pattern verboseExcludeNamePattern = Pattern.compile(Joiner.on("|"). + join(new String[]{LOG.getName(), OperationLog.class.getName()})); + + /* Patterns that are included in execution logging level. + * In execution mode, show only select logger messages. + */ + private static final Pattern executionIncludeNamePattern = Pattern.compile(Joiner.on("|"). + join(new String[]{"org.apache.hadoop.mapreduce.JobSubmitter", + "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(), + Driver.class.getName(), "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"})); + + /* Patterns that are included in performance logging level. + * In performance mode, show execution and performance logger messages. + */ + private static final Pattern performanceIncludeNamePattern = Pattern.compile( + executionIncludeNamePattern.pattern() + "|" + PerfLogger.class.getName()); + + private void setCurrentNamePattern(OperationLog.LoggingLevel mode) { + if (mode == OperationLog.LoggingLevel.VERBOSE) { + this.namePattern = verboseExcludeNamePattern; + } else if (mode == OperationLog.LoggingLevel.EXECUTION) { + this.namePattern = executionIncludeNamePattern; + } else if (mode == OperationLog.LoggingLevel.PERFORMANCE) { + this.namePattern = performanceIncludeNamePattern; + } + } + + public NameFilter(OperationLog.LoggingLevel loggingMode) { + this.loggingMode = loggingMode; + setCurrentNamePattern(loggingMode); + } + + @Override + public Result filter(LogEvent event) { + boolean excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE); + + String logLevel = event.getContextMap().get(LogUtils.OPERATIONLOG_LEVEL_KEY); + logLevel = logLevel == null ? "" : logLevel; + OperationLog.LoggingLevel currentLoggingMode = OperationLog.getLoggingLevel(logLevel); + // If logging is disabled, deny everything. + if (currentLoggingMode == OperationLog.LoggingLevel.NONE) { + return Result.DENY; + } + // Look at the current session's setting + // and set the pattern and excludeMatches accordingly. + if (currentLoggingMode != loggingMode) { + loggingMode = currentLoggingMode; + excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE); + setCurrentNamePattern(loggingMode); + } + + boolean isMatch = namePattern.matcher(event.getLoggerName()).matches(); + + if (excludeMatches == isMatch) { + // Deny if this is black-list filter (excludeMatches = true) and it + // matched or if this is whitelist filter and it didn't match + return Result.DENY; + } + + return Result.NEUTRAL; + } + + @PluginFactory + public static NameFilter createFilter( + @PluginAttribute("loggingLevel") final String loggingLevel) { + // Name required for routing. Error out if it is not set. + Preconditions.checkNotNull(loggingLevel, + "loggingLevel must be specified for " + NameFilter.class.getName()); + + return new NameFilter(OperationLog.getLoggingLevel(loggingLevel)); + } + } + + /** + * Programmatically register a routing appender to Log4J configuration, which + * automatically writes the log of each query to an individual file. + * The equivilent property configuration is as follows: + * # queryId based routing file appender + appender.query-routing.type = Routing + appender.query-routing.name = query-routing + appender.query-routing.routes.type = Routes + appender.query-routing.routes.pattern = $${ctx:queryId} + # default route + appender.query-routing.routes.route-default.type = Route + appender.query-routing.routes.route-default.key = $${ctx:queryId} + appender.query-routing.routes.route-default.app.type = null + appender.query-routing.routes.route-default.app.name = Null + # queryId based route + appender.query-routing.routes.route-mdc.type = Route + appender.query-routing.routes.route-mdc.name = IrrelevantName-query-routing + appender.query-routing.routes.route-mdc.app.type = RandomAccessFile + appender.query-routing.routes.route-mdc.app.name = query-file-appender + appender.query-routing.routes.route-mdc.app.fileName = ${sys:hive.log.dir}/${ctx:sessionId}/${ctx:queryId} + appender.query-routing.routes.route-mdc.app.layout.type = PatternLayout + appender.query-routing.routes.route-mdc.app.layout.pattern = %d{ISO8601} %5p %c{2}: %m%n + * @param conf the configuration for HiveServer2 instance + */ + public static void registerRoutingAppender(org.apache.hadoop.conf.Configuration conf) { + String loggingLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL); + OperationLog.LoggingLevel loggingMode = OperationLog.getLoggingLevel(loggingLevel); + String layout = loggingMode == OperationLog.LoggingLevel.VERBOSE ? verboseLayout : nonVerboseLayout; + String logLocation = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION); + + // Create NullAppender + PluginEntry nullEntry = new PluginEntry(); + nullEntry.setClassName(NullAppender.class.getName()); + nullEntry.setKey("null"); + nullEntry.setName("appender"); + PluginType nullChildType = new PluginType(nullEntry, NullAppender.class, "appender"); + Node nullChildNode = new Node(null, "Null", nullChildType); + + // Create default route + PluginEntry defaultEntry = new PluginEntry(); + defaultEntry.setClassName(Route.class.getName()); + defaultEntry.setKey("route"); + defaultEntry.setName("Route"); + PluginType defaultType = new PluginType(defaultEntry, Route.class, "Route"); + Node nullNode = new Node(null, "Route", defaultType); + nullNode.getChildren().add(nullChildNode); + Route defaultRoute = Route.createRoute(null, "${ctx:queryId}", nullNode); + + // Create queryId based route + PluginEntry entry = new PluginEntry(); + entry.setClassName(Route.class.getName()); + entry.setKey("route"); + entry.setName("Route"); + PluginType type = new PluginType(entry, Route.class, "Route"); + Node node = new Node(null, "Route", type); + + PluginEntry childEntry = new PluginEntry(); + childEntry.setClassName(RandomAccessFileAppender.class.getName()); + childEntry.setKey("randomaccessfile"); + childEntry.setName("appender"); + PluginType childType = new PluginType(childEntry, RandomAccessFileAppender.class, "appender"); + Node childNode = new Node(node, "RandomAccessFile", childType); + childNode.getAttributes().put("name", "query-file-appender"); + childNode.getAttributes().put("fileName", logLocation + "/${ctx:sessionId}/${ctx:queryId}"); + node.getChildren().add(childNode); + + PluginEntry filterEntry = new PluginEntry(); + filterEntry.setClassName(NameFilter.class.getName()); + filterEntry.setKey("namefilter"); + filterEntry.setName("namefilter"); + PluginType filterType = new PluginType(filterEntry, NameFilter.class, "filter"); + Node filterNode = new Node(childNode, "NameFilter", filterType); + filterNode.getAttributes().put("loggingLevel", loggingMode.name()); + childNode.getChildren().add(filterNode); + + PluginEntry layoutEntry = new PluginEntry(); + layoutEntry.setClassName(PatternLayout.class.getName()); + layoutEntry.setKey("patternlayout"); + layoutEntry.setName("layout"); + PluginType layoutType = new PluginType(layoutEntry, PatternLayout.class, "layout"); + Node layoutNode = new Node(childNode, "PatternLayout", layoutType); + layoutNode.getAttributes().put("pattern", layout); + childNode.getChildren().add(layoutNode); + + Route mdcRoute = Route.createRoute(null, null, node); + Routes routes = Routes.createRoutes("${ctx:queryId}", defaultRoute, mdcRoute); + + LoggerContext context = (LoggerContext) LogManager.getContext(false); + Configuration configuration = context.getConfiguration(); + + RoutingAppender routingAppender = RoutingAppender.createAppender("query-routing", + "true", + routes, + configuration, + null, + null, + null); + + LoggerConfig loggerConfig = configuration.getRootLogger(); + loggerConfig.addAppender(routingAppender, null, null); + context.updateLoggers(); + routingAppender.start(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java index 08d0544..d2f8861 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java @@ -337,11 +337,6 @@ private void logTypeWarning(String colName, String colType) { + " is passed for " + colName + "."; warning = "WARNING: " + warning; console.printInfo(warning); - // Propagate warning to beeline via operation log. - OperationLog ol = OperationLog.getCurrentOperationLog(); - if (ol != null) { - ol.writeOperationLog(LoggingLevel.EXECUTION, warning + "\n"); - } } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java b/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java index 18216f2..c37a633 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java @@ -39,10 +39,6 @@ private final LogFile logFile; private LoggingLevel opLoggingLevel = LoggingLevel.UNKNOWN; - public PrintStream getPrintStream() { - return logFile.getPrintStream(); - } - public enum LoggingLevel { NONE, EXECUTION, PERFORMANCE, VERBOSE, UNKNOWN } @@ -76,47 +72,6 @@ public LoggingLevel getOpLoggingLevel() { } /** - * Singleton OperationLog object per thread. - */ - private static final ThreadLocal THREAD_LOCAL_OPERATION_LOG = new - ThreadLocal() { - @Override - protected OperationLog initialValue() { - return null; - } - }; - - public static void setCurrentOperationLog(OperationLog operationLog) { - THREAD_LOCAL_OPERATION_LOG.set(operationLog); - } - - public static OperationLog getCurrentOperationLog() { - return THREAD_LOCAL_OPERATION_LOG.get(); - } - - public static void removeCurrentOperationLog() { - THREAD_LOCAL_OPERATION_LOG.remove(); - } - - /** - * Write operation execution logs into log file - * @param operationLogMessage one line of log emitted from log4j - */ - public void writeOperationLog(String operationLogMessage) { - logFile.write(operationLogMessage); - } - - /** - * Write operation execution logs into log file - * @param operationLogMessage one line of log emitted from log4j - */ - public void writeOperationLog(LoggingLevel level, String operationLogMessage) { - if (opLoggingLevel.compareTo(level) < 0) return; - logFile.write(operationLogMessage); - } - - - /** * Read operation execution logs from log file * @param isFetchFirst true if the Enum FetchOrientation value is Fetch_First * @param maxRows the max number of fetched lines from log @@ -136,26 +91,18 @@ public void close() { } /** - * Wrapper for read/write the operation log file + * Wrapper for read the operation log file */ private class LogFile { private final File file; private BufferedReader in; - private final PrintStream out; private volatile boolean isRemoved; LogFile(File file) throws FileNotFoundException { this.file = file; - in = new BufferedReader(new InputStreamReader(new FileInputStream(file))); - out = new PrintStream(new FileOutputStream(file)); isRemoved = false; } - synchronized void write(String msg) { - // write log to the file - out.print(msg); - } - synchronized List read(boolean isFetchFirst, long maxRows) throws SQLException{ // reset the BufferReader, if fetching from the beginning of the file @@ -171,9 +118,6 @@ synchronized void remove() { if (in != null) { in.close(); } - if (out != null) { - out.close(); - } if (!isRemoved) { FileUtils.forceDelete(file); isRemoved = true; @@ -195,13 +139,7 @@ private void resetIn() { try { in = new BufferedReader(new InputStreamReader(new FileInputStream(file))); } catch (FileNotFoundException e) { - if (isRemoved) { - throw new SQLException("The operation has been closed and its log file " + - file.getAbsolutePath() + " has been removed.", e); - } else { - throw new SQLException("Operation Log file " + file.getAbsolutePath() + - " is not found.", e); - } + return new ArrayList(); } } @@ -227,9 +165,5 @@ private void resetIn() { } return logs; } - - public PrintStream getPrintStream() { - return out; - } } } diff --git a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java index 8f08c2e..c3d9b67 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java @@ -126,12 +126,8 @@ public void runInternal() throws HiveSQLException { resultSchema = new TableSchema(); } if (response.getConsoleMessages() != null) { - // Propagate processor messages (if any) to beeline or other client. - OperationLog ol = OperationLog.getCurrentOperationLog(); - if (ol != null) { - for (String consoleMsg : response.getConsoleMessages()) { - ol.writeOperationLog(LoggingLevel.EXECUTION, consoleMsg + "\n"); - } + for (String consoleMsg : response.getConsoleMessages()) { + LOG.info(consoleMsg); } } } catch (HiveSQLException e) { diff --git a/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java b/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java deleted file mode 100644 index eaf1acb..0000000 --- a/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java +++ /dev/null @@ -1,249 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hive.service.cli.operation; - -import java.io.ByteArrayOutputStream; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.Serializable; -import java.util.regex.Pattern; - -import org.apache.hadoop.hive.ql.Driver; -import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.session.OperationLog; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.core.Appender; -import org.apache.logging.log4j.core.Filter; -import org.apache.logging.log4j.core.Layout; -import org.apache.logging.log4j.core.LogEvent; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender; -import org.apache.logging.log4j.core.appender.ConsoleAppender; -import org.apache.logging.log4j.core.appender.OutputStreamManager; -import org.apache.logging.log4j.core.config.Configuration; -import org.apache.logging.log4j.core.filter.AbstractFilter; -import org.apache.logging.log4j.core.layout.PatternLayout; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Joiner; - -/** - * Divert appender to redirect operation logs to separate files. - */ -public class LogDivertAppender - extends AbstractOutputStreamAppender { - private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(LogDivertAppender.class.getName()); - private static LoggerContext context = (LoggerContext) LogManager.getContext(false); - private static Configuration configuration = context.getConfiguration(); - public static final Layout verboseLayout = PatternLayout.createLayout( - "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n", null, configuration, null, null, true, false, null, null); - public static final Layout nonVerboseLayout = PatternLayout.createLayout( - "%-5p : %m%n", null, configuration, null, null, true, false, null, null); - - private final OperationManager operationManager; - private final StringOutputStreamManager manager; - private boolean isVerbose; - private final Layout layout; - - /** - * Instantiate a WriterAppender and set the output destination to a - * new {@link OutputStreamWriter} initialized with os - * as its {@link OutputStream}. - * - * @param name The name of the Appender. - * @param filter Filter - * @param manager The OutputStreamManager. - * @param operationManager Operation manager - */ - protected LogDivertAppender(String name, Filter filter, - StringOutputStreamManager manager, OperationManager operationManager, - OperationLog.LoggingLevel loggingMode) { - super(name, null, filter, false, true, manager); - this.operationManager = operationManager; - this.manager = manager; - this.isVerbose = (loggingMode == OperationLog.LoggingLevel.VERBOSE); - this.layout = getDefaultLayout(); - } - - public Layout getDefaultLayout() { - // There should be a ConsoleAppender. Copy its Layout. - Logger root = LogManager.getRootLogger(); - Layout layout = null; - - for (Appender ap : ((org.apache.logging.log4j.core.Logger) root).getAppenders().values()) { - if (ap.getClass().equals(ConsoleAppender.class)) { - layout = ap.getLayout(); - break; - } - } - - return layout; - } - - /** - * A log filter that filters messages coming from the logger with the given names. - * It be used as a white list filter or a black list filter. - * We apply black list filter on the Loggers used by the log diversion stuff, so that - * they don't generate more logs for themselves when they process logs. - * White list filter is used for less verbose log collection - */ - private static class NameFilter extends AbstractFilter { - private Pattern namePattern; - private OperationLog.LoggingLevel loggingMode; - private final OperationManager operationManager; - - /* Patterns that are excluded in verbose logging level. - * Filter out messages coming from log processing classes, or we'll run an infinite loop. - */ - private static final Pattern verboseExcludeNamePattern = Pattern.compile(Joiner.on("|"). - join(new String[]{LOG.getName(), OperationLog.class.getName(), - OperationManager.class.getName()})); - - /* Patterns that are included in execution logging level. - * In execution mode, show only select logger messages. - */ - private static final Pattern executionIncludeNamePattern = Pattern.compile(Joiner.on("|"). - join(new String[]{"org.apache.hadoop.mapreduce.JobSubmitter", - "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(), - Driver.class.getName(), "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"})); - - /* Patterns that are included in performance logging level. - * In performance mode, show execution and performance logger messages. - */ - private static final Pattern performanceIncludeNamePattern = Pattern.compile( - executionIncludeNamePattern.pattern() + "|" + PerfLogger.class.getName()); - - private void setCurrentNamePattern(OperationLog.LoggingLevel mode) { - if (mode == OperationLog.LoggingLevel.VERBOSE) { - this.namePattern = verboseExcludeNamePattern; - } else if (mode == OperationLog.LoggingLevel.EXECUTION) { - this.namePattern = executionIncludeNamePattern; - } else if (mode == OperationLog.LoggingLevel.PERFORMANCE) { - this.namePattern = performanceIncludeNamePattern; - } - } - - public NameFilter(OperationLog.LoggingLevel loggingMode, OperationManager op) { - this.operationManager = op; - this.loggingMode = loggingMode; - setCurrentNamePattern(loggingMode); - } - - @Override - public Result filter(LogEvent event) { - OperationLog log = operationManager.getOperationLogByThread(); - boolean excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE); - - if (log == null) { - return Result.DENY; - } - - OperationLog.LoggingLevel currentLoggingMode = log.getOpLoggingLevel(); - // If logging is disabled, deny everything. - if (currentLoggingMode == OperationLog.LoggingLevel.NONE) { - return Result.DENY; - } - // Look at the current session's setting - // and set the pattern and excludeMatches accordingly. - if (currentLoggingMode != loggingMode) { - loggingMode = currentLoggingMode; - excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE); - setCurrentNamePattern(loggingMode); - } - - boolean isMatch = namePattern.matcher(event.getLoggerName()).matches(); - - if (excludeMatches == isMatch) { - // Deny if this is black-list filter (excludeMatches = true) and it - // matched or if this is whitelist filter and it didn't match - return Result.DENY; - } - return Result.NEUTRAL; - } - } - - public static LogDivertAppender createInstance(OperationManager operationManager, - OperationLog.LoggingLevel loggingMode) { - return new LogDivertAppender("LogDivertAppender", new NameFilter(loggingMode, operationManager), - new StringOutputStreamManager(new ByteArrayOutputStream(), "StringStream", null), - operationManager, loggingMode); - } - - public String getOutput() { - return new String(manager.getStream().toByteArray()); - } - - @Override - public void start() { - super.start(); - } - - @Override - public Layout getLayout() { - - // If there is a logging level change from verbose->non-verbose or vice-versa since - // the last subAppend call, change the layout to preserve consistency. - OperationLog log = operationManager.getOperationLogByThread(); - if (log != null) { - isVerbose = (log.getOpLoggingLevel() == OperationLog.LoggingLevel.VERBOSE); - } - - // layout is immutable in log4j2, so we cheat here and return a different layout when - // verbosity changes - if (isVerbose) { - return verboseLayout; - } else { - return layout == null ? nonVerboseLayout : layout; - } - } - - @Override - public void append(LogEvent event) { - super.append(event); - - String logOutput = getOutput(); - manager.reset(); - - OperationLog log = operationManager.getOperationLogByThread(); - if (log == null) { - LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName()); - return; - } - log.writeOperationLog(logOutput); - } - - protected static class StringOutputStreamManager extends OutputStreamManager { - ByteArrayOutputStream stream; - - protected StringOutputStreamManager(ByteArrayOutputStream os, String streamName, - Layout layout) { - super(os, streamName, layout, true); - stream = os; - } - - public ByteArrayOutputStream getStream() { - return stream; - } - - public void reset() { - stream.reset(); - } - } -} diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index 11a820f..b01a69d 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -27,6 +27,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; @@ -35,7 +36,6 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.OperationLog; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationHandle; @@ -46,17 +46,13 @@ import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; import org.apache.hive.service.rpc.thrift.TProtocolVersion; -import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import com.google.common.collect.Sets; public abstract class Operation { - // Constants of the key strings for the log4j ThreadContext. - public static final String SESSIONID_LOG_KEY = "sessionId"; - public static final String QUERYID_LOG_KEY = "queryId"; - protected final HiveSession parentSession; private volatile OperationState state = OperationState.INITIALIZED; private volatile MetricsScope currentStateScope; @@ -212,44 +208,9 @@ public boolean isDone() { protected void createOperationLog() { if (parentSession.isOperationLogEnabled()) { - File operationLogFile = new File(parentSession.getOperationLogSessionDir(), - opHandle.getHandleIdentifier().toString()); + File operationLogFile = new File(parentSession.getOperationLogSessionDir(), queryState.getQueryId()); isOperationLogEnabled = true; - // create log file - try { - if (operationLogFile.exists()) { - LOG.warn("The operation log file should not exist, but it is already there: " + - operationLogFile.getAbsolutePath()); - operationLogFile.delete(); - } - if (!operationLogFile.getParentFile().exists()) { - LOG.warn("Operations log directory for this session does not exist, it could have been deleted " + - "externally. Recreating the directory for future queries in this session but the older operation " + - "logs for this session are no longer available"); - if (!operationLogFile.getParentFile().mkdir()) { - LOG.warn("Log directory for this session could not be created, disabling " + - "operation logs: " + operationLogFile.getParentFile().getAbsolutePath()); - isOperationLogEnabled = false; - return; - } - } - if (!operationLogFile.createNewFile()) { - // the log file already exists and cannot be deleted. - // If it can be read/written, keep its contents and use it. - if (!operationLogFile.canRead() || !operationLogFile.canWrite()) { - LOG.warn("The already existed operation log file cannot be recreated, " + - "and it cannot be read or written: " + operationLogFile.getAbsolutePath()); - isOperationLogEnabled = false; - return; - } - } - } catch (Exception e) { - LOG.warn("Unable to create operation log file: " + operationLogFile.getAbsolutePath(), e); - isOperationLogEnabled = false; - return; - } - // create OperationLog object with above log file try { operationLog = new OperationLog(opHandle.toString(), operationLogFile, parentSession.getHiveConf()); @@ -259,15 +220,6 @@ protected void createOperationLog() { isOperationLogEnabled = false; return; } - - // register this operationLog to current thread - OperationLog.setCurrentOperationLog(operationLog); - } - } - - protected void unregisterOperationLog() { - if (isOperationLogEnabled) { - OperationLog.removeCurrentOperationLog(); } } @@ -284,15 +236,14 @@ protected void beforeRun() { * Register logging context so that Log4J can print QueryId and/or SessionId for each message */ protected void registerLoggingContext() { - ThreadContext.put(SESSIONID_LOG_KEY, SessionState.get().getSessionId()); - ThreadContext.put(QUERYID_LOG_KEY, confOverlay.get(HiveConf.ConfVars.HIVEQUERYID.varname)); + LogUtils.registerLoggingContext(queryState.getConf()); } /** * Unregister logging context */ protected void unregisterLoggingContext() { - ThreadContext.clearAll(); + MDC.clear(); } /** @@ -301,7 +252,6 @@ protected void unregisterLoggingContext() { */ protected void afterRun() { unregisterLoggingContext(); - unregisterOperationLog(); } /** diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 3f8f68e..6763877 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.ql.log.LogDivertAppender; import org.apache.hadoop.hive.ql.session.OperationLog; import org.apache.hive.service.AbstractService; import org.apache.hive.service.cli.FetchOrientation; @@ -46,11 +47,6 @@ import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.core.Appender; -import org.apache.logging.log4j.core.LoggerContext; -import org.apache.logging.log4j.core.config.Configuration; -import org.apache.logging.log4j.core.config.LoggerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,8 +71,7 @@ public OperationManager() { @Override public synchronized void init(HiveConf hiveConf) { if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { - initOperationLogCapture(hiveConf.getVar( - HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL)); + LogDivertAppender.registerRoutingAppender(hiveConf); } else { LOG.debug("Operation level logging is turned off"); } @@ -97,17 +92,6 @@ public synchronized void stop() { super.stop(); } - private void initOperationLogCapture(String loggingMode) { - // Register another Appender (with the same layout) that talks to us. - Appender ap = LogDivertAppender.createInstance(this, OperationLog.getLoggingLevel(loggingMode)); - LoggerContext context = (LoggerContext) LogManager.getContext(false); - Configuration configuration = context.getConfiguration(); - LoggerConfig loggerConfig = configuration.getLoggerConfig(LoggerFactory.getLogger(getClass()).getName()); - loggerConfig.addAppender(ap, null, null); - context.updateLoggers(); - ap.start(); - } - public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException { @@ -361,11 +345,6 @@ private Schema getLogSchema() { return Collections.unmodifiableCollection(handleToOperation.values()); } - - public OperationLog getOperationLogByThread() { - return OperationLog.getCurrentOperationLog(); - } - public List removeExpiredOperations(OperationHandle[] handles) { List removed = new ArrayList(); for (OperationHandle handle : handles) { diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index f41092e..de42e90 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -338,8 +338,6 @@ public Object run() throws HiveSQLException { // TODO: can this result in cross-thread reuse of session state? SessionState.setCurrentSessionState(parentSessionState); PerfLogger.setPerfLogger(parentPerfLogger); - // Set current OperationLog in this async thread for keeping on saving query log. - registerCurrentOperationLog(); registerLoggingContext(); try { if (asyncPrepare) { @@ -352,7 +350,6 @@ public Object run() throws HiveSQLException { LOG.error("Error running hive query: ", e); } finally { unregisterLoggingContext(); - unregisterOperationLog(); } return null; } @@ -393,18 +390,6 @@ private UserGroupInformation getCurrentUGI() throws HiveSQLException { } } - private void registerCurrentOperationLog() { - if (isOperationLogEnabled) { - if (operationLog == null) { - LOG.warn("Failed to get current OperationLog object of Operation: " + - getHandle().getHandleIdentifier()); - isOperationLogEnabled = false; - return; - } - OperationLog.setCurrentOperationLog(operationLog); - } - } - private synchronized void cleanup(OperationState state) throws HiveSQLException { setState(state);