diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4cd3b32..af33e81 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1603,8 +1603,14 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) { "${system:java.io.tmpdir}" + File.separator + "${system:user.name}" + File.separator + "operation_logs", "Top level directory where operation logs are stored if logging functionality is enabled"), - HIVE_SERVER2_LOGGING_OPERATION_VERBOSE("hive.server2.logging.operation.verbose", false, - "When true, HS2 operation logs available for clients will be verbose"), + HIVE_SERVER2_LOGGING_OPERATION_LEVEL("hive.server2.logging.operation.level", "EXECUTION", + new StringSet("NONE", "EXECUTION", "PERFORMANCE", "VERBOSE"), + "HS2 operation logging mode available to clients to be set at session level.\n" + + "For this to work, hive.server2.logging.operation.enabled should be set to true.\n" + + " NONE: Ignore any logging\n" + + " EXECUTION: Log completion of tasks\n" + + " PERFORMANCE: Execution + Performance logs \n" + + " VERBOSE: All logs" ), // logging configuration HIVE_LOG4J_FILE("hive.log4j.file", "", "Hive log4j configuration file.\n" + diff --git a/data/conf/hive-log4j.properties b/data/conf/hive-log4j.properties index e91388f..023e3c2 100644 --- a/data/conf/hive-log4j.properties +++ b/data/conf/hive-log4j.properties @@ -91,7 +91,7 @@ log4j.logger.org.apache.zookeeper.server.NIOServerCnxn=WARN,DRFA log4j.logger.org.apache.zookeeper.ClientCnxn=WARN,DRFA log4j.logger.org.apache.zookeeper.ClientCnxnSocket=WARN,DRFA log4j.logger.org.apache.zookeeper.ClientCnxnSocketNIO=WARN,DRFA -log4j.logger.org.apache.hadoop.hive.ql.log.PerfLogger=WARN,DRFA +log4j.logger.org.apache.hadoop.hive.ql.log.PerfLogger=${hive.ql.log.PerfLogger.level} log4j.logger.org.apache.hadoop.hive.ql.exec.Operator=INFO,DRFA log4j.logger.org.apache.hadoop.hive.serde2.lazy=INFO,DRFA log4j.logger.org.apache.hadoop.hive.metastore.ObjectStore=INFO,DRFA diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml index ed3ee42..8f7fb28 100644 --- a/data/conf/hive-site.xml +++ b/data/conf/hive-site.xml @@ -240,6 +240,11 @@ Using dummy param to test server specific configuration + + hive.ql.log.PerfLogger.level + WARN,DRFA + Used to change the perflogger level + hive.fetch.task.conversion diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 2c85877..57dcc2a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -106,7 +106,7 @@ public TestJdbcDriver2() { public static void setUpBeforeClass() throws SQLException, ClassNotFoundException{ Class.forName(driverName); Connection con1 = getConnection("default"); - System.setProperty(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_VERBOSE.varname, "" + true); + System.setProperty(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL.varname, "verbose"); Statement stmt1 = con1.createStatement(); assertNotNull("Statement is null", stmt1); diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java new file mode 100644 index 0000000..21487b0 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.cli.operation; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.apache.hive.service.cli.CLIServiceClient; +import org.apache.hive.service.cli.FetchOrientation; +import org.apache.hive.service.cli.FetchType; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.OperationHandle; +import org.apache.hive.service.cli.OperationState; +import org.apache.hive.service.cli.OperationStatus; +import org.apache.hive.service.cli.RowSet; +import org.apache.hive.service.cli.SessionHandle; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * TestOperationLoggingAPI + * Test the FetchResults of TFetchType.LOG in thrift level. + */ +public class TestOperationLoggingAPI { + private static HiveConf hiveConf; + private final String tableName = "testOperationLoggingAPI_table"; + private File dataFile; + private CLIServiceClient client; + private static MiniHS2 miniHS2 = null; + private static Map confOverlay; + private SessionHandle sessionHandle; + private final String sql = "select * from " + tableName; + private final String sqlCntStar = "select count(*) from " + tableName; + private final String[] expectedLogs = { + "Parsing command", + "Parse Completed", + "Starting Semantic Analysis", + "Semantic Analysis Completed", + "Starting command" + }; + private final String[] expectedLogsExecution = { + "Number of reduce tasks determined at compile time", + "number of splits", + "Submitting tokens for job", + "Ended Job" + }; + private final String[] expectedLogsPerformance = { + "", + "", + "", + "" + }; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + hiveConf = new HiveConf(); + hiveConf.set(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL.varname, "verbose"); + // We need to set the below parameter to test performance level logging + hiveConf.set("hive.ql.log.PerfLogger.level", "INFO,DRFA"); + miniHS2 = new MiniHS2(hiveConf); + confOverlay = new HashMap(); + confOverlay.put(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + miniHS2.start(confOverlay); + } + + /** + * Open a session, and create a table for cases usage + * @throws Exception + */ + @Before + public void setUp() throws Exception { + dataFile = new File(hiveConf.get("test.data.files"), "kv1.txt"); + client = miniHS2.getServiceClient(); + sessionHandle = setupSession(); + } + + @After + public void tearDown() throws Exception { + // Cleanup + String queryString = "DROP TABLE " + tableName; + client.executeStatement(sessionHandle, queryString, null); + + client.closeSession(sessionHandle); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + miniHS2.stop(); + } + + @Test + public void testFetchResultsOfLog() throws Exception { + // verify whether the sql operation log is generated and fetch correctly. + OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null); + RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, + FetchType.LOG); + verifyFetchedLog(rowSetLog, expectedLogs); + } + + @Test + public void testFetchResultsOfLogAsync() throws Exception { + // verify whether the sql operation log is generated and fetch correctly in async mode. + OperationHandle operationHandle = client.executeStatementAsync(sessionHandle, sql, null); + + // Poll on the operation status till the query is completed + boolean isQueryRunning = true; + long pollTimeout = System.currentTimeMillis() + 100000; + OperationStatus opStatus; + OperationState state = null; + RowSet rowSetAccumulated = null; + StringBuilder logs = new StringBuilder(); + + while (isQueryRunning) { + // Break if polling times out + if (System.currentTimeMillis() > pollTimeout) { + break; + } + opStatus = client.getOperationStatus(operationHandle); + Assert.assertNotNull(opStatus); + state = opStatus.getState(); + + rowSetAccumulated = client.fetchResults(operationHandle, FetchOrientation.FETCH_NEXT, 2000, + FetchType.LOG); + for (Object[] row : rowSetAccumulated) { + logs.append(row[0]); + } + + if (state == OperationState.CANCELED || + state == OperationState.CLOSED || + state == OperationState.FINISHED || + state == OperationState.ERROR) { + isQueryRunning = false; + } + Thread.sleep(10); + } + // The sql should be completed now. + Assert.assertEquals("Query should be finished", OperationState.FINISHED, state); + + // Verify the accumulated logs + verifyFetchedLogPost(logs.toString(), expectedLogs, true); + + // Verify the fetched logs from the beginning of the log file + RowSet rowSet = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 2000, + FetchType.LOG); + verifyFetchedLog(rowSet, expectedLogs); + } + + @Test + public void testFetchResultsOfLogWithOrientation() throws Exception { + // (FETCH_FIRST) execute a sql, and fetch its sql operation log as expected value + OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null); + RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, + FetchType.LOG); + int expectedLogLength = rowSetLog.numRows(); + + // (FETCH_NEXT) execute the same sql again, + // and fetch the sql operation log with FETCH_NEXT orientation + OperationHandle operationHandleWithOrientation = client.executeStatement(sessionHandle, sql, + null); + RowSet rowSetLogWithOrientation; + int logLength = 0; + int maxRows = calculateProperMaxRows(expectedLogLength); + do { + rowSetLogWithOrientation = client.fetchResults(operationHandleWithOrientation, + FetchOrientation.FETCH_NEXT, maxRows, FetchType.LOG); + logLength += rowSetLogWithOrientation.numRows(); + } while (rowSetLogWithOrientation.numRows() == maxRows); + Assert.assertEquals(expectedLogLength, logLength); + + // (FETCH_FIRST) fetch again from the same operation handle with FETCH_FIRST orientation + rowSetLogWithOrientation = client.fetchResults(operationHandleWithOrientation, + FetchOrientation.FETCH_FIRST, 1000, FetchType.LOG); + verifyFetchedLog(rowSetLogWithOrientation, expectedLogs); + } + + @Test + public void testFetchResultsOfLogWithVerboseMode() throws Exception { + String queryString = "set hive.server2.logging.operation.level=verbose"; + client.executeStatement(sessionHandle, queryString, null); + // verify whether the sql operation log is generated and fetch correctly. + OperationHandle operationHandle = client.executeStatement(sessionHandle, sqlCntStar, null); + RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, + FetchType.LOG); + // Verbose Logs should contain everything, including execution and performance + verifyFetchedLog(rowSetLog, expectedLogs); + verifyFetchedLog(rowSetLog, expectedLogsExecution); + verifyFetchedLog(rowSetLog, expectedLogsPerformance); + } + + @Test + public void testFetchResultsOfLogWithPerformanceMode() throws Exception { + try { + String queryString = "set hive.server2.logging.operation.level=performance"; + client.executeStatement(sessionHandle, queryString, null); + // verify whether the sql operation log is generated and fetch correctly. + OperationHandle operationHandle = client.executeStatement(sessionHandle, sqlCntStar, null); + RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, + FetchType.LOG); + // rowSetLog should contain execution as well as performance logs + verifyFetchedLog(rowSetLog, expectedLogsExecution); + verifyFetchedLog(rowSetLog, expectedLogsPerformance); + verifyMissingContentsInFetchedLog(rowSetLog, expectedLogs); + } finally { + // Restore everything to default setup to avoid discrepancy between junit test runs + String queryString2 = "set hive.server2.logging.operation.level=verbose"; + client.executeStatement(sessionHandle, queryString2, null); + } + } + + @Test + public void testFetchResultsOfLogWithExecutionMode() throws Exception { + try { + String queryString = "set hive.server2.logging.operation.level=execution"; + client.executeStatement(sessionHandle, queryString, null); + // verify whether the sql operation log is generated and fetch correctly. + OperationHandle operationHandle = client.executeStatement(sessionHandle, sqlCntStar, null); + RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, + FetchType.LOG); + verifyFetchedLog(rowSetLog, expectedLogsExecution); + verifyMissingContentsInFetchedLog(rowSetLog, expectedLogsPerformance); + verifyMissingContentsInFetchedLog(rowSetLog, expectedLogs); + } finally { + // Restore everything to default setup to avoid discrepancy between junit test runs + String queryString2 = "set hive.server2.logging.operation.level=verbose"; + client.executeStatement(sessionHandle, queryString2, null); + } + } + + @Test + public void testFetchResultsOfLogWithNoneMode() throws Exception { + try { + String queryString = "set hive.server2.logging.operation.level=none"; + client.executeStatement(sessionHandle, queryString, null); + // verify whether the sql operation log is generated and fetch correctly. + OperationHandle operationHandle = client.executeStatement(sessionHandle, sqlCntStar, null); + RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, + FetchType.LOG); + // We should not get any rows. + assert(rowSetLog.numRows() == 0); + } finally { + // Restore everything to default setup to avoid discrepancy between junit test runs + String queryString2 = "set hive.server2.logging.operation.level=verbose"; + client.executeStatement(sessionHandle, queryString2, null); + } + } + + @Test + public void testFetchResultsOfLogCleanup() throws Exception { + // Verify cleanup functionality. + // Open a new session, since this case needs to close the session in the end. + SessionHandle sessionHandleCleanup = setupSession(); + + // prepare + OperationHandle operationHandle = client.executeStatement(sessionHandleCleanup, sql, null); + RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, + FetchType.LOG); + verifyFetchedLog(rowSetLog, expectedLogs); + + File sessionLogDir = new File( + hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION) + + File.separator + sessionHandleCleanup.getHandleIdentifier()); + File operationLogFile = new File(sessionLogDir, operationHandle.getHandleIdentifier().toString()); + + // check whether exception is thrown when fetching log from a closed operation. + client.closeOperation(operationHandle); + try { + client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, FetchType.LOG); + Assert.fail("Fetch should fail"); + } catch (HiveSQLException e) { + Assert.assertTrue(e.getMessage().contains("Invalid OperationHandle:")); + } + + // check whether operation log file is deleted. + if (operationLogFile.exists()) { + Assert.fail("Operation log file should be deleted."); + } + + // check whether session log dir is deleted after session is closed. + client.closeSession(sessionHandleCleanup); + if (sessionLogDir.exists()) { + Assert.fail("Session log dir should be deleted."); + } + } + + private SessionHandle setupSession() throws Exception { + // Open a session + SessionHandle sessionHandle = client.openSession(null, null, null); + + // Change lock manager to embedded mode + String queryString = "SET hive.lock.manager=" + + "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; + client.executeStatement(sessionHandle, queryString, null); + + // Drop the table if it exists + queryString = "DROP TABLE IF EXISTS " + tableName; + client.executeStatement(sessionHandle, queryString, null); + + // Create a test table + queryString = "create table " + tableName + " (key int, value string)"; + client.executeStatement(sessionHandle, queryString, null); + + // Load data + queryString = "load data local inpath '" + dataFile + "' into table " + tableName; + client.executeStatement(sessionHandle, queryString, null); + + // Precondition check: verify whether the table is created and data is fetched correctly. + OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null); + RowSet rowSetResult = client.fetchResults(operationHandle); + Assert.assertEquals(500, rowSetResult.numRows()); + Assert.assertEquals(238, rowSetResult.iterator().next()[0]); + Assert.assertEquals("val_238", rowSetResult.iterator().next()[1]); + + return sessionHandle; + } + + // Since the log length of the sql operation may vary during HIVE dev, calculate a proper maxRows. + private int calculateProperMaxRows(int len) { + if (len < 10) { + return 1; + } else if (len < 100) { + return 10; + } else { + return 100; + } + } + + private String verifyFetchedLogPre(RowSet rowSet, String[] el) { + StringBuilder stringBuilder = new StringBuilder(); + + for (Object[] row : rowSet) { + stringBuilder.append(row[0]); + } + + return stringBuilder.toString(); + } + + private void verifyFetchedLog(RowSet rowSet, String[] el) { + String logs = verifyFetchedLogPre(rowSet, el); + verifyFetchedLogPost(logs, el, true); + } + + private void verifyMissingContentsInFetchedLog(RowSet rowSet, String[] el) { + String logs = verifyFetchedLogPre(rowSet, el); + verifyFetchedLogPost(logs, el, false); + } + + private void verifyFetchedLogPost(String logs, String[] el, boolean contains) { + for (String log : el) { + if (contains) { + Assert.assertTrue("Checking for presence of " + log, logs.contains(log)); + } else { + Assert.assertFalse("Checking for absence of " + log, logs.contains(log)); + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java b/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java index 571bbbc..5b6df2b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/OperationLog.java @@ -20,6 +20,7 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.io.IOUtils; import java.io.*; @@ -36,10 +37,38 @@ private final String operationName; private final LogFile logFile; + private LoggingLevel opLoggingLevel = LoggingLevel.UNKNOWN; - public OperationLog(String name, File file) throws FileNotFoundException{ + public static enum LoggingLevel { + NONE, EXECUTION, PERFORMANCE, VERBOSE, UNKNOWN + } + + public OperationLog(String name, File file, HiveConf hiveConf) throws FileNotFoundException { operationName = name; logFile = new LogFile(file); + + if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { + String logLevel = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL); + opLoggingLevel = getLoggingLevel(logLevel); + } + } + + public static LoggingLevel getLoggingLevel (String mode) { + if (mode.equalsIgnoreCase("none")) { + return LoggingLevel.NONE; + } else if (mode.equalsIgnoreCase("execution")) { + return LoggingLevel.EXECUTION; + } else if (mode.equalsIgnoreCase("verbose")) { + return LoggingLevel.VERBOSE; + } else if (mode.equalsIgnoreCase("performance")) { + return LoggingLevel.PERFORMANCE; + } else { + return LoggingLevel.UNKNOWN; + } + } + + public LoggingLevel getOpLoggingLevel() { + return opLoggingLevel; } /** diff --git a/service/src/java/org/apache/hive/service/cli/CLIServiceUtils.java b/service/src/java/org/apache/hive/service/cli/CLIServiceUtils.java index 876ade8..9d64b10 100644 --- a/service/src/java/org/apache/hive/service/cli/CLIServiceUtils.java +++ b/service/src/java/org/apache/hive/service/cli/CLIServiceUtils.java @@ -18,6 +18,9 @@ package org.apache.hive.service.cli; +import org.apache.log4j.Layout; +import org.apache.log4j.PatternLayout; + /** * CLIServiceUtils. * @@ -26,6 +29,10 @@ private static final char SEARCH_STRING_ESCAPE = '\\'; + public static final Layout verboseLayout = new PatternLayout( + "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n"); + public static final Layout nonVerboseLayout = new PatternLayout( + "%-5p : %m%n"); /** * Convert a SQL search pattern into an equivalent Java Regex. diff --git a/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java b/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java index 4737785..70340bd 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java +++ b/service/src/java/org/apache/hive/service/cli/operation/LogDivertAppender.java @@ -18,10 +18,16 @@ package org.apache.hive.service.cli.operation; import java.io.CharArrayWriter; +import java.util.Enumeration; import java.util.regex.Pattern; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.hadoop.hive.ql.session.OperationLog.LoggingLevel; +import org.apache.hive.service.cli.CLIServiceUtils; +import org.apache.log4j.Appender; +import org.apache.log4j.ConsoleAppender; import org.apache.log4j.Layout; import org.apache.log4j.Logger; import org.apache.log4j.WriterAppender; @@ -36,6 +42,8 @@ public class LogDivertAppender extends WriterAppender { private static final Logger LOG = Logger.getLogger(LogDivertAppender.class.getName()); private final OperationManager operationManager; + private boolean isVerbose; + private Layout verboseLayout; /** * A log filter that filters messages coming from the logger with the given names. @@ -45,18 +53,71 @@ * White list filter is used for less verbose log collection */ private static class NameFilter extends Filter { - private final Pattern namePattern; - private final boolean excludeMatches; + private Pattern namePattern; + private LoggingLevel loggingMode; + private OperationManager operationManager; - public NameFilter(boolean isExclusionFilter, String [] loggerNames) { - this.excludeMatches = isExclusionFilter; - String matchRegex = Joiner.on("|").join(loggerNames); - this.namePattern = Pattern.compile(matchRegex); + /* Patterns that are excluded in verbose logging level. + * Filter out messages coming from log processing classes, or we'll run an infinite loop. + */ + private static final Pattern verboseExcludeNamePattern = Pattern.compile(Joiner.on("|"). + join(new String[] {LOG.getName(), OperationLog.class.getName(), + OperationManager.class.getName()})); + + /* Patterns that are included in execution logging level. + * In execution mode, show only select logger messages. + */ + private static final Pattern executionIncludeNamePattern = Pattern.compile(Joiner.on("|"). + join(new String[] {"org.apache.hadoop.mapreduce.JobSubmitter", + "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(), + "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"})); + + /* Patterns that are included in performance logging level. + * In performance mode, show execution and performance logger messages. + */ + private static final Pattern performanceIncludeNamePattern = Pattern.compile( + executionIncludeNamePattern.pattern() + "|" + PerfLogger.class.getName()); + + private void setCurrentNamePattern(OperationLog.LoggingLevel mode) { + if (mode == OperationLog.LoggingLevel.VERBOSE) { + this.namePattern = verboseExcludeNamePattern; + } else if (mode == OperationLog.LoggingLevel.EXECUTION) { + this.namePattern = executionIncludeNamePattern; + } else if (mode == OperationLog.LoggingLevel.PERFORMANCE) { + this.namePattern = performanceIncludeNamePattern; + } + } + + public NameFilter( + OperationLog.LoggingLevel loggingMode, OperationManager op) { + this.operationManager = op; + this.loggingMode = loggingMode; + setCurrentNamePattern(loggingMode); } @Override public int decide(LoggingEvent ev) { + OperationLog log = operationManager.getOperationLogByThread(); + boolean excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE); + + if (log == null) { + return Filter.DENY; + } + + OperationLog.LoggingLevel currentLoggingMode = log.getOpLoggingLevel(); + // If logging is disabled, deny everything. + if (currentLoggingMode == OperationLog.LoggingLevel.NONE) { + return Filter.DENY; + } + // Look at the current session's setting + // and set the pattern and excludeMatches accordingly. + if (currentLoggingMode != loggingMode) { + loggingMode = currentLoggingMode; + setCurrentNamePattern(loggingMode); + } + boolean isMatch = namePattern.matcher(ev.getLoggerName()).matches(); + if (excludeMatches == isMatch) { // Deny if this is black-list filter (excludeMatches = true) and it // matched @@ -70,25 +131,61 @@ public int decide(LoggingEvent ev) { /** This is where the log message will go to */ private final CharArrayWriter writer = new CharArrayWriter(); - public LogDivertAppender(Layout layout, OperationManager operationManager, boolean isVerbose) { - setLayout(layout); + private void setLayout (boolean isVerbose, Layout lo) { + if (isVerbose) { + if (lo == null) { + lo = CLIServiceUtils.verboseLayout; + LOG.info("Cannot find a Layout from a ConsoleAppender. Using default Layout pattern."); + } + } else { + lo = CLIServiceUtils.nonVerboseLayout; + } + setLayout(lo); + } + + private void initLayout(boolean isVerbose) { + // There should be a ConsoleAppender. Copy its Layout. + Logger root = Logger.getRootLogger(); + Layout layout = null; + + Enumeration appenders = root.getAllAppenders(); + while (appenders.hasMoreElements()) { + Appender ap = (Appender) appenders.nextElement(); + if (ap.getClass().equals(ConsoleAppender.class)) { + layout = ap.getLayout(); + break; + } + } + setLayout(isVerbose, layout); + } + + public LogDivertAppender(OperationManager operationManager, + OperationLog.LoggingLevel loggingMode) { + isVerbose = (loggingMode == OperationLog.LoggingLevel.VERBOSE); + initLayout(isVerbose); setWriter(writer); setName("LogDivertAppender"); this.operationManager = operationManager; + this.verboseLayout = isVerbose ? layout : CLIServiceUtils.verboseLayout; + addFilter(new NameFilter(loggingMode, operationManager)); + } - if (isVerbose) { - // Filter out messages coming from log processing classes, or we'll run an - // infinite loop. - String[] exclLoggerNames = { LOG.getName(), OperationLog.class.getName(), - OperationManager.class.getName() }; - addFilter(new NameFilter(true, exclLoggerNames)); - } else { - // in non verbose mode, show only select logger messages - String[] inclLoggerNames = { "org.apache.hadoop.mapreduce.JobSubmitter", - "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(), - "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"}; - addFilter(new NameFilter(false, inclLoggerNames)); + @Override + public void doAppend(LoggingEvent event) { + OperationLog log = operationManager.getOperationLogByThread(); + + // Set current layout depending on the verbose/non-verbose mode. + if (log != null) { + boolean isCurrModeVerbose = (log.getOpLoggingLevel() == OperationLog.LoggingLevel.VERBOSE); + + // If there is a logging level change from verbose->non-verbose or vice-versa since + // the last subAppend call, change the layout to preserve consistency. + if (isCurrModeVerbose != isVerbose) { + isVerbose = isCurrModeVerbose; + setLayout(isVerbose, verboseLayout); + } } + super.doAppend(event); } /** diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java index d85db8a..19153b6 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -210,7 +210,7 @@ protected void createOperationLog() { // create OperationLog object with above log file try { - operationLog = new OperationLog(opHandle.toString(), operationLogFile); + operationLog = new OperationLog(opHandle.toString(), operationLogFile, parentSession.getHiveConf()); } catch (FileNotFoundException e) { LOG.warn("Unable to instantiate OperationLog object for operation: " + opHandle, e); diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 6cae8a8..92c340a 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -20,7 +20,6 @@ import java.sql.SQLException; import java.util.ArrayList; -import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,10 +41,7 @@ import org.apache.hive.service.cli.TableSchema; import org.apache.hive.service.cli.session.HiveSession; import org.apache.log4j.Appender; -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.Layout; import org.apache.log4j.Logger; -import org.apache.log4j.PatternLayout; /** * OperationManager. @@ -54,7 +50,6 @@ public class OperationManager extends AbstractService { private final Log LOG = LogFactory.getLog(OperationManager.class.getName()); - private HiveConf hiveConf; private final Map handleToOperation = new HashMap(); @@ -64,10 +59,9 @@ public OperationManager() { @Override public synchronized void init(HiveConf hiveConf) { - this.hiveConf = hiveConf; if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) { - boolean isVerbose = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_VERBOSE); - initOperationLogCapture(isVerbose); + initOperationLogCapture(hiveConf.getVar( + HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL)); } else { LOG.debug("Operation level logging is turned off"); } @@ -86,34 +80,10 @@ public synchronized void stop() { super.stop(); } - private void initOperationLogCapture(boolean isVerbose) { - // There should be a ConsoleAppender. Copy its Layout. - Logger root = Logger.getRootLogger(); - Layout layout = null; - - Enumeration appenders = root.getAllAppenders(); - while (appenders.hasMoreElements()) { - Appender ap = (Appender) appenders.nextElement(); - if (ap.getClass().equals(ConsoleAppender.class)) { - layout = ap.getLayout(); - break; - } - } - - final String VERBOSE_PATTERN = "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n"; - final String NONVERBOSE_PATTERN = "%-5p : %m%n"; - - if (isVerbose) { - if (layout == null) { - layout = new PatternLayout(VERBOSE_PATTERN); - LOG.info("Cannot find a Layout from a ConsoleAppender. Using default Layout pattern."); - } - } else { - layout = new PatternLayout(NONVERBOSE_PATTERN); - } + private void initOperationLogCapture(String loggingMode) { // Register another Appender (with the same layout) that talks to us. - Appender ap = new LogDivertAppender(layout, this, isVerbose); - root.addAppender(ap); + Appender ap = new LogDivertAppender(this, OperationLog.getLoggingLevel(loggingMode)); + Logger.getRootLogger().addAppender(ap); } public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, diff --git a/service/src/test/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java b/service/src/test/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java deleted file mode 100644 index 42bdf21..0000000 --- a/service/src/test/org/apache/hive/service/cli/operation/TestOperationLoggingAPI.java +++ /dev/null @@ -1,268 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hive.service.cli.operation; - -import java.io.File; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hive.service.cli.FetchOrientation; -import org.apache.hive.service.cli.FetchType; -import org.apache.hive.service.cli.HiveSQLException; -import org.apache.hive.service.cli.OperationHandle; -import org.apache.hive.service.cli.OperationState; -import org.apache.hive.service.cli.OperationStatus; -import org.apache.hive.service.cli.RowSet; -import org.apache.hive.service.cli.SessionHandle; -import org.apache.hive.service.cli.thrift.EmbeddedThriftBinaryCLIService; -import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -/** - * TestOperationLoggingAPI - * Test the FetchResults of TFetchType.LOG in thrift level. - */ -public class TestOperationLoggingAPI { - private static HiveConf hiveConf; - private final String tableName = "testOperationLoggingAPI_table"; - private File dataFile; - private ThriftCLIServiceClient client; - private SessionHandle sessionHandle; - private final String sql = "select * from " + tableName; - private final String[] expectedLogs = { - "Parsing command", - "Parse Completed", - "Starting Semantic Analysis", - "Semantic Analysis Completed", - "Starting command" - }; - - @BeforeClass - public static void setUpBeforeClass() { - hiveConf = new HiveConf(); - hiveConf.setBoolean(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_VERBOSE.varname, true); - } - - /** - * Start embedded mode, open a session, and create a table for cases usage - * @throws Exception - */ - @Before - public void setUp() throws Exception { - dataFile = new File(hiveConf.get("test.data.files"), "kv1.txt"); - EmbeddedThriftBinaryCLIService service = new EmbeddedThriftBinaryCLIService(); - service.init(hiveConf); - client = new ThriftCLIServiceClient(service); - sessionHandle = setupSession(); - } - - @After - public void tearDown() throws Exception { - // Cleanup - String queryString = "DROP TABLE " + tableName; - client.executeStatement(sessionHandle, queryString, null); - - client.closeSession(sessionHandle); - } - - @Test - public void testFetchResultsOfLog() throws Exception { - // verify whether the sql operation log is generated and fetch correctly. - OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null); - RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, - FetchType.LOG); - verifyFetchedLog(rowSetLog); - } - - @Test - public void testFetchResultsOfLogAsync() throws Exception { - // verify whether the sql operation log is generated and fetch correctly in async mode. - OperationHandle operationHandle = client.executeStatementAsync(sessionHandle, sql, null); - - // Poll on the operation status till the query is completed - boolean isQueryRunning = true; - long pollTimeout = System.currentTimeMillis() + 100000; - OperationStatus opStatus; - OperationState state = null; - RowSet rowSetAccumulated = null; - StringBuilder logs = new StringBuilder(); - - while (isQueryRunning) { - // Break if polling times out - if (System.currentTimeMillis() > pollTimeout) { - break; - } - opStatus = client.getOperationStatus(operationHandle); - Assert.assertNotNull(opStatus); - state = opStatus.getState(); - - rowSetAccumulated = client.fetchResults(operationHandle, FetchOrientation.FETCH_NEXT, 1000, - FetchType.LOG); - for (Object[] row : rowSetAccumulated) { - logs.append(row[0]); - } - - if (state == OperationState.CANCELED || - state == OperationState.CLOSED || - state == OperationState.FINISHED || - state == OperationState.ERROR) { - isQueryRunning = false; - } - Thread.sleep(10); - } - // The sql should be completed now. - Assert.assertEquals("Query should be finished", OperationState.FINISHED, state); - - // Verify the accumulated logs - verifyFetchedLog(logs.toString()); - - // Verify the fetched logs from the beginning of the log file - RowSet rowSet = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, - FetchType.LOG); - verifyFetchedLog(rowSet); - } - - @Test - public void testFetchResultsOfLogWithOrientation() throws Exception { - // (FETCH_FIRST) execute a sql, and fetch its sql operation log as expected value - OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null); - RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, - FetchType.LOG); - int expectedLogLength = rowSetLog.numRows(); - - // (FETCH_NEXT) execute the same sql again, - // and fetch the sql operation log with FETCH_NEXT orientation - OperationHandle operationHandleWithOrientation = client.executeStatement(sessionHandle, sql, - null); - RowSet rowSetLogWithOrientation; - int logLength = 0; - int maxRows = calculateProperMaxRows(expectedLogLength); - do { - rowSetLogWithOrientation = client.fetchResults(operationHandleWithOrientation, - FetchOrientation.FETCH_NEXT, maxRows, FetchType.LOG); - logLength += rowSetLogWithOrientation.numRows(); - } while (rowSetLogWithOrientation.numRows() == maxRows); - Assert.assertEquals(expectedLogLength, logLength); - - // (FETCH_FIRST) fetch again from the same operation handle with FETCH_FIRST orientation - rowSetLogWithOrientation = client.fetchResults(operationHandleWithOrientation, - FetchOrientation.FETCH_FIRST, 1000, FetchType.LOG); - verifyFetchedLog(rowSetLogWithOrientation); - } - - @Test - public void testFetchResultsOfLogCleanup() throws Exception { - // Verify cleanup functionality. - // Open a new session, since this case needs to close the session in the end. - SessionHandle sessionHandleCleanup = setupSession(); - - // prepare - OperationHandle operationHandle = client.executeStatement(sessionHandleCleanup, sql, null); - RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, - FetchType.LOG); - verifyFetchedLog(rowSetLog); - - File sessionLogDir = new File( - hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION) + - File.separator + sessionHandleCleanup.getHandleIdentifier()); - File operationLogFile = new File(sessionLogDir, operationHandle.getHandleIdentifier().toString()); - - // check whether exception is thrown when fetching log from a closed operation. - client.closeOperation(operationHandle); - try { - client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, FetchType.LOG); - Assert.fail("Fetch should fail"); - } catch (HiveSQLException e) { - Assert.assertTrue(e.getMessage().contains("Invalid OperationHandle:")); - } - - // check whether operation log file is deleted. - if (operationLogFile.exists()) { - Assert.fail("Operation log file should be deleted."); - } - - // check whether session log dir is deleted after session is closed. - client.closeSession(sessionHandleCleanup); - if (sessionLogDir.exists()) { - Assert.fail("Session log dir should be deleted."); - } - } - - private SessionHandle setupSession() throws Exception { - // Open a session - SessionHandle sessionHandle = client.openSession(null, null, null); - - // Change lock manager to embedded mode - String queryString = "SET hive.lock.manager=" + - "org.apache.hadoop.hive.ql.lockmgr.EmbeddedLockManager"; - client.executeStatement(sessionHandle, queryString, null); - - // Drop the table if it exists - queryString = "DROP TABLE IF EXISTS " + tableName; - client.executeStatement(sessionHandle, queryString, null); - - // Create a test table - queryString = "create table " + tableName + " (key int, value string)"; - client.executeStatement(sessionHandle, queryString, null); - - // Load data - queryString = "load data local inpath '" + dataFile + "' into table " + tableName; - client.executeStatement(sessionHandle, queryString, null); - - // Precondition check: verify whether the table is created and data is fetched correctly. - OperationHandle operationHandle = client.executeStatement(sessionHandle, sql, null); - RowSet rowSetResult = client.fetchResults(operationHandle); - Assert.assertEquals(500, rowSetResult.numRows()); - Assert.assertEquals(238, rowSetResult.iterator().next()[0]); - Assert.assertEquals("val_238", rowSetResult.iterator().next()[1]); - - return sessionHandle; - } - - // Since the log length of the sql operation may vary during HIVE dev, calculate a proper maxRows. - private int calculateProperMaxRows(int len) { - if (len < 10) { - return 1; - } else if (len < 100) { - return 10; - } else { - return 100; - } - } - - private void verifyFetchedLog(RowSet rowSet) { - StringBuilder stringBuilder = new StringBuilder(); - - for (Object[] row : rowSet) { - stringBuilder.append(row[0]); - } - - String logs = stringBuilder.toString(); - verifyFetchedLog(logs); - } - - private void verifyFetchedLog(String logs) { - for (String log : expectedLogs) { - Assert.assertTrue("Checking for presence of " + log, logs.contains(log)); - } - } -}