commit fe390a6ba8d1a30eed9ec387eb5c17c137308a31 Author: Andrew Sherman Date: Thu Jul 20 15:33:27 2017 -0700 HIVE-17128 Operation Logging leaks file descriptors as the log4j Appender is never closed Previously HIVE-16061 and HIVE-16400 changed Operation Logging to use the Log4j2 RoutingAppender to automatically output the log for each query into an individual operation log file. As log4j does not know when a query is finished it keeps the OutputStream in the subordinate Appender open even when the query completes. The stream holds a file descriptor and so we leak file descriptors. Note that we are already careful to close any streams reading from the operation log file. To fix this we use a technique described in the comments of LOG4J2-510 which uses reflection to close the subordinate Appender. We use this to close the per-query subordinate Appenders from both LogDivertAppender and LogDivertAppenderForTest. The test in TestOperationLoggingLayout is extended to check that the Appenders are closed when a query completes. diff --git common/src/java/org/apache/hadoop/hive/common/LogUtils.java common/src/java/org/apache/hadoop/hive/common/LogUtils.java index 83f3af7440253bfbcedbc8b21d745fb71c0d7fb9..0a3e0c72011951b6b1543352308bd51233c847fb 100644 --- common/src/java/org/apache/hadoop/hive/common/LogUtils.java +++ common/src/java/org/apache/hadoop/hive/common/LogUtils.java @@ -19,13 +19,22 @@ package org.apache.hadoop.hive.common; import java.io.File; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URL; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.routing.RoutingAppender; +import org.apache.logging.log4j.core.config.AppenderControl; import org.apache.logging.log4j.core.config.Configurator; +import org.apache.logging.log4j.core.config.LoggerConfig; import org.apache.logging.log4j.core.impl.Log4jContextFactory; import org.apache.logging.log4j.spi.DefaultThreadContextMap; import org.slf4j.Logger; @@ -222,4 +231,36 @@ public static void registerLoggingContext(Configuration conf) { public static void unregisterLoggingContext() { MDC.clear(); } + + /** + * Stop the subordinate appender for the operation log so it will not leak a file descriptor. + * @param routingAppenderName the name of the RoutingAppender + * @param queryId the id of the query that is closing + */ + public static void stopQueryAppender(String routingAppenderName, String queryId) { + LoggerContext context = (LoggerContext) LogManager.getContext(false); + org.apache.logging.log4j.core.config.Configuration configuration = context.getConfiguration(); + LoggerConfig loggerConfig = configuration.getRootLogger(); + Map appenders = loggerConfig.getAppenders(); + RoutingAppender routingAppender = (RoutingAppender) appenders.get(routingAppenderName); + // routingAppender can be null if it has not been registered + if (routingAppender != null) { + // The appender is configured to use ${ctx:queryId} by registerRoutingAppender() + try { + Class clazz = routingAppender.getClass(); + Method method = clazz.getDeclaredMethod("getControl", String.class, LogEvent.class); + method.setAccessible(true); + AppenderControl control = (AppenderControl) method.invoke(routingAppender, queryId, null); + Appender subordinateAppender = control.getAppender(); + if (!subordinateAppender.isStopped()) { + // this will cause the subordinate appender to close its output stream. + subordinateAppender.stop(); + } + } catch (NoSuchMethodException | SecurityException | IllegalAccessException | + IllegalArgumentException | InvocationTargetException e) { + l4j.warn("Unable to close the operation log appender for query id " + queryId, e); + } + } + } + } diff --git itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingLayout.java itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingLayout.java index 1a8337f574bb753e8c3c48a6b477b17700b05256..3c30069adc04aea54587953159b951d1b0776705 100644 --- itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingLayout.java +++ itests/hive-unit/src/test/java/org/apache/hive/service/cli/operation/TestOperationLoggingLayout.java @@ -18,11 +18,15 @@ package org.apache.hive.service.cli.operation; import java.io.File; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.log.LogDivertAppender; +import org.apache.hadoop.hive.ql.log.LogDivertAppenderForTest; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.apache.hive.service.cli.CLIServiceClient; import org.apache.hive.service.cli.FetchOrientation; @@ -30,6 +34,13 @@ import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.SessionHandle; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.routing.RoutingAppender; +import org.apache.logging.log4j.core.config.AppenderControl; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.LoggerConfig; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -95,11 +106,52 @@ public void testSwitchLogLayout() throws Exception { RowSet rowSetLog = client.fetchResults(operationHandle, FetchOrientation.FETCH_FIRST, 1000, FetchType.LOG); Iterator iter = rowSetLog.iterator(); + String queryId = null; // non-verbose pattern is %-5p : %m%n. Look for " : " while (iter.hasNext()) { String row = iter.next()[0].toString(); Assert.assertEquals(true, row.matches("^.*(FATAL|ERROR|WARN|INFO|DEBUG|TRACE).*$")); + + // look for a row like "INFO : Query ID = asherman_20170718154720_17c7d18b-36e6-4b35-a8e2-f50847db58ae" + String queryIdLoggingProbe = "INFO : Query ID = "; + int index = row.indexOf(queryIdLoggingProbe); + if (index >= 0) { + queryId = row.substring(queryIdLoggingProbe.length()).trim(); + } } + Assert.assertNotNull("Could not find query id, perhaps a logging message changed", queryId); + + checkAppenderState("before operation close ", LogDivertAppender.QUERY_ROUTING_APPENDER, queryId, false); + checkAppenderState("before operation close ", LogDivertAppenderForTest.TEST_QUERY_ROUTING_APPENDER, queryId, false); + client.closeOperation(operationHandle); + checkAppenderState("after operation close ", LogDivertAppender.QUERY_ROUTING_APPENDER, queryId, true); + checkAppenderState("after operation close ", LogDivertAppenderForTest.TEST_QUERY_ROUTING_APPENDER, queryId, true); + + } + + /** + * assert that the appender for the given queryId is in the expected state + * @param msg a diagnostic + * @param routingAppenderName name of the RoutingAppender + * @param queryId the query id to use as a key + * @param expectedStopped the expected stop state + */ + private void checkAppenderState(String msg, String routingAppenderName, String queryId, + boolean expectedStopped) throws NoSuchFieldException, IllegalAccessException { + LoggerContext context = (LoggerContext) LogManager.getContext(false); + Configuration configuration = context.getConfiguration(); + LoggerConfig loggerConfig = configuration.getRootLogger(); + Map appendersMap = loggerConfig.getAppenders(); + RoutingAppender routingAppender = (RoutingAppender) appendersMap.get(routingAppenderName); + Assert.assertNotNull(msg + "could not find routingAppender " + routingAppenderName, routingAppender); + Field defaultsField = RoutingAppender.class.getDeclaredField("appenders"); + defaultsField.setAccessible(true); + ConcurrentHashMap appenders = (ConcurrentHashMap) defaultsField.get(routingAppender); + AppenderControl appenderControl = (AppenderControl) appenders.get(queryId); + Assert.assertNotNull(msg + "could not find AppenderControl for query id " + queryId, appenderControl); + Appender appender = appenderControl.getAppender(); + Assert.assertNotNull(msg + "could not find Appender for query id " + queryId + " from AppenderControl " + appenderControl, appender); + Assert.assertEquals(msg + "Appender for query is in unexpected state", expectedStopped, appender.isStopped()); } private SessionHandle setupSession() throws Exception { diff --git ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java index e697b545984555414e27bafe92d7f22829a22687..c6a5341007b566fe1c87c6c6e41462c21480ff3e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java +++ ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java @@ -17,21 +17,25 @@ */ package org.apache.hadoop.hive.ql.log; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; import java.util.regex.Pattern; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.session.OperationLog; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Appender; import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.appender.RandomAccessFileAppender; import org.apache.logging.log4j.core.appender.routing.Route; import org.apache.logging.log4j.core.appender.routing.Routes; import org.apache.logging.log4j.core.appender.routing.RoutingAppender; +import org.apache.logging.log4j.core.config.AppenderControl; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; import org.apache.logging.log4j.core.config.Node; @@ -54,6 +58,10 @@ private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(LogDivertAppender.class.getName()); public static final String verboseLayout = "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n"; public static final String nonVerboseLayout = "%-5p : %m%n"; + /** + * Name of the query routine appender. + */ + public static final String QUERY_ROUTING_APPENDER = "query-routing"; /** * A log filter that filters messages coming from the logger with the given names. @@ -146,7 +154,7 @@ public static NameFilter createFilter( /** * Programmatically register a routing appender to Log4J configuration, which * automatically writes the log of each query to an individual file. - * The equivilent property configuration is as follows: + * The equivalent property configuration is as follows: * # queryId based routing file appender appender.query-routing.type = Routing appender.query-routing.name = query-routing @@ -233,7 +241,7 @@ public static void registerRoutingAppender(org.apache.hadoop.conf.Configuration LoggerContext context = (LoggerContext) LogManager.getContext(false); Configuration configuration = context.getConfiguration(); - RoutingAppender routingAppender = RoutingAppender.createAppender("query-routing", + RoutingAppender routingAppender = RoutingAppender.createAppender(QUERY_ROUTING_APPENDER, "true", routes, configuration, @@ -246,4 +254,5 @@ public static void registerRoutingAppender(org.apache.hadoop.conf.Configuration context.updateLoggers(); routingAppender.start(); } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java index 465844d66b92b371f457fda0d885d75fbfce6805..e8da18bc66db14267c0bf35f342b2c7213bfd6e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java +++ ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppenderForTest.java @@ -41,6 +41,12 @@ * CLI qtest results. */ public final class LogDivertAppenderForTest { + + /** + * Name of the test query routine appender. + */ + public static final String TEST_QUERY_ROUTING_APPENDER = "test-query-routing"; + private LogDivertAppenderForTest() { // Prevent instantiation } @@ -169,7 +175,7 @@ public static void registerRoutingAppenderIfInTest(org.apache.hadoop.conf.Config Configuration configuration = context.getConfiguration(); // Create the appender - RoutingAppender routingAppender = RoutingAppender.createAppender("test-query-routing", + RoutingAppender routingAppender = RoutingAppender.createAppender(TEST_QUERY_ROUTING_APPENDER, "true", routes, configuration, diff --git service/src/java/org/apache/hive/service/cli/operation/Operation.java service/src/java/org/apache/hive/service/cli/operation/Operation.java index 8d453d5d9153c2ec86c4adc7a68bd3b5dd249743..21809f9d5b1e87aa5663768ba44976a939c6f1b5 100644 --- service/src/java/org/apache/hive/service/cli/operation/Operation.java +++ service/src/java/org/apache/hive/service/cli/operation/Operation.java @@ -18,7 +18,6 @@ package org.apache.hive.service.cli.operation; import java.io.File; -import java.io.FileNotFoundException; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -33,6 +32,8 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.log.LogDivertAppender; +import org.apache.hadoop.hive.ql.log.LogDivertAppenderForTest; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.OperationLog; import org.apache.hive.service.cli.FetchOrientation; @@ -264,6 +265,10 @@ protected synchronized void cleanupOperationLog() { } else { operationLog.close(); } + // stop the appenders for the operation log + String queryId = queryState.getQueryId(); + LogUtils.stopQueryAppender(LogDivertAppender.QUERY_ROUTING_APPENDER, queryId); + LogUtils.stopQueryAppender(LogDivertAppenderForTest.TEST_QUERY_ROUTING_APPENDER, queryId); } }