diff --git llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java index 0edbe7c..f41912b 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java @@ -37,7 +37,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; -import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemon; +import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants; import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; @@ -277,7 +277,7 @@ private void run(String[] args) throws Exception { .setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName()); } - URL logger = conf.getResource(LlapDaemon.LOG4j2_PROPERTIES_FILE); + URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE); if (null == logger) { throw new Exception("Unable to find required config file: llap-daemon-log4j2.properties"); @@ -424,12 +424,12 @@ private void run(String[] args) throws Exception { IOUtils.copyBytes(loggerContent, lfs.create(new Path(confPath, "llap-daemon-log4j2.properties"), true), conf, true); - String metricsFile = LlapDaemon.LLAP_HADOOP_METRICS2_PROPERTIES_FILE; + String metricsFile = LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE; URL metrics2 = conf.getResource(metricsFile); if (metrics2 == null) { - LOG.warn(LlapDaemon.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " cannot be found." + - " Looking for " + LlapDaemon.HADOOP_METRICS2_PROPERTIES_FILE); - metricsFile = LlapDaemon.HADOOP_METRICS2_PROPERTIES_FILE; + LOG.warn(LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " cannot be found." + + " Looking for " + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE); + metricsFile = LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE; metrics2 = conf.getResource(metricsFile); } if (metrics2 != null) { @@ -438,8 +438,8 @@ private void run(String[] args) throws Exception { conf, true); LOG.info("Copied hadoop metrics2 properties file from " + metrics2); } else { - LOG.warn("Cannot find " + LlapDaemon.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " or " + - LlapDaemon.HADOOP_METRICS2_PROPERTIES_FILE + " in classpath."); + LOG.warn("Cannot find " + LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " or " + + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE + " in classpath."); } PrintWriter udfStream = diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 103115e..a9031fd 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -223,7 +223,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws Token jobToken = TokenCache.getSessionToken(credentials); QueryFragmentInfo fragmentInfo = queryTracker.registerFragment( - queryIdentifier, qIdProto.getApplicationIdString(), + queryIdentifier, qIdProto.getApplicationIdString(), dagId, vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier, vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapConstants.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapConstants.java new file mode 100644 index 0000000..cdb4070 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapConstants.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +import org.apache.hadoop.hive.common.classification.InterfaceAudience; + +@InterfaceAudience.Private +public class LlapConstants { + public static final String LOG4j2_PROPERTIES_FILE = "llap-daemon-log4j2.properties"; + public static final String LLAP_HADOOP_METRICS2_PROPERTIES_FILE = "hadoop-metrics2-llapdaemon.properties"; + public static final String HADOOP_METRICS2_PROPERTIES_FILE = "hadoop-metrics2.properties"; + + // Note: Do not change without changing the corresponding reference in llap-daemon-log4j2.properties + public static final String LLAP_LOG4j2_PURGE_POLICY_NAME_DAG_ROUTING = + "llapLogPurgerDagRouting"; + // Note: Do not change without changing the corresponding reference in llap-daemon-log4j2.properties + public static final String LLAP_LOG4j2_PURGE_POLICY_NAME_QUERY_ROUTING = + "llapLogPurgerQueryRouting"; +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 91b8727..5457658 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -27,7 +27,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Pattern; import javax.management.ObjectName; @@ -85,9 +84,6 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapDaemon.class); - public static final String LOG4j2_PROPERTIES_FILE = "llap-daemon-log4j2.properties"; - public static final String LLAP_HADOOP_METRICS2_PROPERTIES_FILE = "hadoop-metrics2-llapdaemon.properties"; - public static final String HADOOP_METRICS2_PROPERTIES_FILE = "hadoop-metrics2.properties"; private final Configuration shuffleHandlerConf; private final SecretManager secretManager; private final LlapProtocolServerImpl server; @@ -284,7 +280,8 @@ public LlapDaemon(Configuration daemonConf, int numExecutors, long executorMemor private void initializeLogging(final Configuration conf) { long start = System.currentTimeMillis(); - URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource(LOG4j2_PROPERTIES_FILE); + URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource( + LlapConstants.LOG4j2_PROPERTIES_FILE); if (llap_l4j2 != null) { final boolean async = LogUtils.checkAndSetAsyncLogging(conf); // required for MDC based routing appender so that child threads can inherit the MDC context @@ -295,7 +292,7 @@ private void initializeLogging(final Configuration conf) { llap_l4j2, (end - start), async); } else { throw new RuntimeException("Log initialization failed." + - " Unable to locate " + LOG4j2_PROPERTIES_FILE + " file in classpath"); + " Unable to locate " + LlapConstants.LOG4j2_PROPERTIES_FILE + " file in classpath"); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java index 96e77e4..00492ac 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryIdentifier.java @@ -16,6 +16,14 @@ /** * An identifier for a query, which is unique. + * + * This is based on the AppId and dagId. + * + * At the moment, It's possible for Hive to use the same "hive.query.id" if a single query + * is split into stages - which prevents this identifier being used as the unique id. + * + * Not exposing getters to allow this to evolve - i.e. retain the uniqueness constraint. + * */ public final class QueryIdentifier { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 6914134..8e5735f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -19,7 +19,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -34,6 +33,7 @@ import com.google.common.collect.Multimap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; @@ -41,7 +41,9 @@ public class QueryInfo { private final QueryIdentifier queryIdentifier; private final String appIdString; + private final String dagIdString; private final String dagName; + private final String hiveQueryIdString; private final int dagIdentifier; private final String user; private final String[] localDirsBase; @@ -57,12 +59,17 @@ private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker(); private final String tokenUserName, appId; - public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagName, - int dagIdentifier, String user, ConcurrentMap sourceStateMap, - String[] localDirsBase, FileSystem localFs, String tokenUserName, String tokenAppId) { + public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString, + String dagName, String hiveQueryIdString, + int dagIdentifier, String user, + ConcurrentMap sourceStateMap, + String[] localDirsBase, FileSystem localFs, String tokenUserName, + String tokenAppId) { this.queryIdentifier = queryIdentifier; this.appIdString = appIdString; + this.dagIdString = dagIdString; this.dagName = dagName; + this.hiveQueryIdString = hiveQueryIdString; this.dagIdentifier = dagIdentifier; this.sourceStateMap = sourceStateMap; this.user = user; @@ -80,6 +87,14 @@ public String getAppIdString() { return appIdString; } + public String getDagIdString() { + return dagIdString; + } + + public String getHiveQueryIdString() { + return hiveQueryIdString; + } + public int getDagIdentifier() { return dagIdentifier; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 5366b9f..e078fe1 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -21,8 +21,11 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.log4j.MDC; +import org.apache.logging.slf4j.Log4jMarker; import org.apache.tez.common.CallableWithNdc; import org.apache.hadoop.service.AbstractService; @@ -38,6 +41,7 @@ import org.apache.tez.common.security.JobTokenIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.Marker; import java.io.IOException; import java.util.Collections; @@ -57,11 +61,14 @@ public class QueryTracker extends AbstractService { private static final Logger LOG = LoggerFactory.getLogger(QueryTracker.class); + private static final Marker QUERY_COMPLETE_MARKER = new Log4jMarker(new Log4jQueryCompleteMarker()); private final ScheduledExecutorService executorService; private final ConcurrentHashMap queryInfoMap = new ConcurrentHashMap<>(); + + private final String[] localDirsBase; private final FileSystem localFs; private final String clusterId; @@ -116,13 +123,11 @@ public QueryTracker(Configuration conf, String[] localDirsBase, String clusterId /** * Register a new fragment for a specific query */ - QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, + QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagIdString, String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user, SignableVertexSpec vertex, Token appToken, String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException { - // QueryIdentifier is enough to uniquely identify a fragment. At the moment, it works off of appId and dag index. - // At a later point this could be changed to the Hive query identifier. - // Sending both over RPC is unnecessary overhead. + ReadWriteLock dagLock = getDagLock(queryIdentifier); dagLock.readLock().lock(); try { @@ -144,9 +149,11 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId if (UserGroupInformation.isSecurityEnabled()) { Preconditions.checkNotNull(tokenInfo.userName); } - queryInfo = new QueryInfo(queryIdentifier, appIdString, dagName, dagIdentifier, user, - getSourceCompletionMap(queryIdentifier), localDirsBase, localFs, - tokenInfo.userName, tokenInfo.appId); + queryInfo = + new QueryInfo(queryIdentifier, appIdString, dagIdString, dagName, hiveQueryIdString, + dagIdentifier, user, + getSourceCompletionMap(queryIdentifier), localDirsBase, localFs, + tokenInfo.userName, tokenInfo.appId); QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo); if (old != null) { queryInfo = old; @@ -222,6 +229,22 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { ShuffleHandler.get().unregisterDag(localDir, queryInfo.getAppIdString(), queryInfo.getDagIdentifier()); } } + + // Inform the routing purgePolicy. + // Send out a fake log message at the ERROR level with the MDC for this query setup. With an + // LLAP custom appender this message will not be logged. + final String dagId = queryInfo.getDagIdString(); + final String queryId = queryInfo.getHiveQueryIdString(); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); + try { + LOG.error(QUERY_COMPLETE_MARKER, "Ignore this. Log line to interact with logger." + + " Query complete: " + queryInfo.getHiveQueryIdString() + ", " + + queryInfo.getDagIdString()); + } finally { + MDC.clear(); + } + // Clearing this before sending a kill is OK, since canFinish will change to false. // Ideally this should be a state machine where kills are issued to the executor, // and the structures are cleaned up once all tasks complete. New requests, however, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/log/LlapRoutingAppenderPurgePolicy.java llap-server/src/java/org/apache/hadoop/hive/llap/log/LlapRoutingAppenderPurgePolicy.java new file mode 100644 index 0000000..f47815e --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/log/LlapRoutingAppenderPurgePolicy.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.log; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.google.common.base.Preconditions; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.Marker; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.routing.PurgePolicy; +import org.apache.logging.log4j.core.appender.routing.RoutingAppender; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.status.StatusLogger; + +/** + * A purge policy for the {@link RoutingAppender} which awaits a notification from the application + * about a key no longer being required, before it purges it. + */ +@Plugin(name = "LlapRoutingAppenderPurgePolicy", category = "Core", printObject = true) +public class LlapRoutingAppenderPurgePolicy implements PurgePolicy { + + private static final Logger LOGGER = StatusLogger.getLogger(); + + private static final ConcurrentMap INSTANCES = + new ConcurrentHashMap<>(); + + private final Set knownAppenders = + Collections.newSetFromMap(new ConcurrentHashMap()); + private final String name; + + // The Routing appender, which manages underlying appenders + private RoutingAppender routingAppender; + + public LlapRoutingAppenderPurgePolicy(String name) { + LOGGER.trace("Created " + LlapRoutingAppenderPurgePolicy.class.getName() + " with name=" + name); + this.name=name; + } + + private LlapRoutingAppenderPurgePolicy() { + this("_NOOP_"); + } + + @Override + public void initialize(RoutingAppender routingAppender) { + this.routingAppender = routingAppender; + } + + @Override + public void purge() { + // Nothing to do here. This is not invoked by the log4j framework. Should likely not be in + // the log4j interface + } + + @Override + public void update(String key, LogEvent event) { + Marker marker = event.getMarker(); + if (marker != null && marker.getName() != null && marker.getName().equals(Log4jQueryCompleteMarker.EOF_MARKER)) { + LOGGER.debug("Received " + Log4jQueryCompleteMarker.EOF_MARKER + " for key. Attempting cleanup."); + keyComplete(key); + } + else { + if (knownAppenders.add(key)) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Registered key: [" + key + "] on purgePolicyWithName=" + name + + ", thisAddress=" + System.identityHashCode(this)); + } + } + } + } + + /** + * Indicate that the specified key is no longer used. + * @param key + */ + private void keyComplete(String key) { + Preconditions.checkNotNull(key, "Key must be specified"); + boolean removed = knownAppenders.remove(key); + if (removed) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Deleting Appender for key: " + key); + } + routingAppender.deleteAppender(key); + } else { + LOGGER.trace("Ignoring call to remove unknown key: " + key); + } + } + + + @PluginFactory + public static PurgePolicy createPurgePolicy( + @PluginAttribute("name") final String name) { + + // Name required for routing. Error out if it is not set. + Preconditions.checkNotNull(name, + "Name must be specified for " + LlapRoutingAppenderPurgePolicy.class.getName()); + LlapRoutingAppenderPurgePolicy llapRoutingAppenderPurgePolicy = + new LlapRoutingAppenderPurgePolicy(name); + LlapRoutingAppenderPurgePolicy old = INSTANCES.putIfAbsent(name, llapRoutingAppenderPurgePolicy); + if (old != null) { + LOGGER.debug("Attempt to create multiple instances of " + + LlapRoutingAppenderPurgePolicy.class.getName() + " with the name " + name + + ". Using original instance"); + llapRoutingAppenderPurgePolicy = old; + } + return llapRoutingAppenderPurgePolicy; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/log/LlapWrappedAppender.java llap-server/src/java/org/apache/hadoop/hive/llap/log/LlapWrappedAppender.java new file mode 100644 index 0000000..f8fdcd4 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/log/LlapWrappedAppender.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.log; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.base.Preconditions; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.appender.RandomAccessFileAppender; +import org.apache.logging.log4j.core.config.AppenderControl; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.Node; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.config.plugins.PluginNode; +import org.apache.logging.log4j.util.Strings; + +/** + * An appender to wrap around RandomAccessFileAppender, and rename the file once the appender is + * closed. + * Could be potentially extended to other appenders by special casing them to figure out how files + * are to be handled. + */ +@Plugin(name = "LlapWrappedAppender", category = "Core", + elementType = "appender", printObject = true, deferChildren = true) +public class LlapWrappedAppender extends AbstractAppender { + + private static final boolean DEFAULT_RENAME_FILES_ON_CLOSE = true; + private static final String DEFAULT_RENAMED_FILE_SUFFIX = ".done"; + + private final Node node; + private final Configuration config; + private final boolean renameFileOnClose; + private final String renamedFileSuffix; + + private AtomicReference realAppender = new AtomicReference<>(); + private AtomicReference appenderControl = new AtomicReference<>(); + + public LlapWrappedAppender(final String name, final Node node, final Configuration config, + boolean renameOnClose, String renamedFileSuffix) { + super(name, null, null); + this.node = node; + this.config = config; + this.renameFileOnClose = renameOnClose; + this.renamedFileSuffix = renamedFileSuffix; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + LlapWrappedAppender.class.getName() + " created with name=" + name + ", renameOnClose=" + + renameOnClose + ", renamedFileSuffix=" + renamedFileSuffix); + } + } + + @Override + public void start() { + super.start(); + } + + @Override + public void append(LogEvent event) { + setupAppenderIfRequired(event); + if (appenderControl.get() != null) { + if (!(event.getMarker() != null && event.getMarker().getName() != null && + event.getMarker().getName().equals(Log4jQueryCompleteMarker.EOF_MARKER))) { + appenderControl.get().callAppender(event); + } else { + LOGGER.debug("Not forwarding message with maker={}, marker.getName={}", event.getMarker(), + (event.getMarker() == null ? "nullMarker" : event.getMarker().getName())); + } + } + } + + private void setupAppenderIfRequired(LogEvent event) { + if (appenderControl.get() == null) { + if (node.getType().getElementName().equalsIgnoreCase("appender")) { + for (final Node cnode : node.getChildren()) { + final Node appNode = new Node(cnode); + config.createConfiguration(appNode, event); + if (appNode.getObject() instanceof Appender) { + final Appender app = appNode.getObject(); + app.start(); + Preconditions.checkArgument(app instanceof RandomAccessFileAppender, + LlapWrappedAppender.class.getName() + " only supports " + + RandomAccessFileAppender.class.getName() + ". Found appender name=" + + app.getName() + ", class=" + app.getClass().getName()); + realAppender.set(app); + appenderControl.set(new AppenderControl(app, null, null)); + if (LOGGER.isDebugEnabled()) { + RandomAccessFileAppender raf = (RandomAccessFileAppender) app; + LOGGER.debug( + "Setup new appender to write to file: " + raf.getFileName() + ", appenderName=" + + raf.getName() + ", appenderManagerName=" + raf.getManager().getName()); + + } + break; + } + } + if (appenderControl.get() == null) { + // Fail if mis-configured. + throw new RuntimeException(LlapWrappedAppender.class.getSimpleName() + + "name=" + getName() + " unable to setup actual appender." + + "Could not find child appender"); + } + } else { + // Fail if mis-configured. + throw new RuntimeException(LlapWrappedAppender.class.getSimpleName() + + "name=" + getName() + " unable to setup actual appender." + + "Could not find child appender"); + } + } + } + + + @Override + public void stop() { + if (!(this.isStopping() || this.isStopped())) { + super.stop(); + if (appenderControl.get() != null) { + appenderControl.get().stop(); + realAppender.get().stop(); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Stop invoked for " + ((RandomAccessFileAppender) realAppender.get()).getFileName()); + } + + RandomAccessFileAppender raf = (RandomAccessFileAppender) realAppender.get(); + Path renamedPath = null; + if (renameFileOnClose) { + try { + // Look for a file to which we can move the existing file. With external services, + // it's possible for the service to be marked complete after each fragment. + int counter = 0; + while(true) { + renamedPath = getRenamedPath(raf.getFileName(), counter); + if (!Files.exists(renamedPath)) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Renaming file: " + raf.getFileName() + " to " + renamedPath); + } + Files.move(Paths.get(raf.getFileName()), renamedPath); + break; + } + counter++; + } + } catch (IOException e) { + // Bail on an exception - out of the loop. + LOGGER.warn("Failed to rename file: " + raf.getFileName() + " to " + renamedPath, e); + } + } + } + } + + private Path getRenamedPath(String originalFileName, int iteration) { + Path renamedPath; + if (iteration == 0) { + renamedPath = Paths.get(originalFileName + renamedFileSuffix); + } else { + renamedPath = Paths.get(originalFileName + "." + iteration + renamedFileSuffix); + } + return renamedPath; + } + + @PluginFactory + public static LlapWrappedAppender createAppender( + @PluginAttribute("name") final String name, // This isn't really used for anything. + @PluginAttribute("renameFileOnClose") final String renameFileOnCloseProvided, + @PluginAttribute("renamedFileSuffix") final String renamedFileSuffixProvided, + @PluginNode final Node node, + @PluginConfiguration final Configuration config + ) { + if (config == null) { + LOGGER.error("PluginConfiguration not expected to be null"); + return null; + } + if (node == null) { + LOGGER.error("Node must be specified as an appender specification"); + return null; + } + + boolean renameFileOnClose = DEFAULT_RENAME_FILES_ON_CLOSE; + if (Strings.isNotBlank(renameFileOnCloseProvided)) { + renameFileOnClose = Boolean.parseBoolean(renameFileOnCloseProvided); + } + String renamedFileSuffix = DEFAULT_RENAMED_FILE_SUFFIX; + if (Strings.isNotBlank(renamedFileSuffixProvided)) { + renamedFileSuffix = renamedFileSuffixProvided; + } + + return new LlapWrappedAppender(name, node, config, renameFileOnClose, renamedFileSuffix); + } + +} \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/log/Log4jQueryCompleteMarker.java llap-server/src/java/org/apache/hadoop/hive/llap/log/Log4jQueryCompleteMarker.java new file mode 100644 index 0000000..97ffea3 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/log/Log4jQueryCompleteMarker.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.log; + +import org.apache.logging.log4j.Marker; + +public class Log4jQueryCompleteMarker implements Marker { + + public static final String EOF_MARKER = "EOF_MARKER"; + + @Override + public Marker addParents(Marker... markers) { + throw new UnsupportedOperationException(); + } + + @Override + public String getName() { + return EOF_MARKER; + } + + @Override + public Marker[] getParents() { + return new Marker[0]; + } + + @Override + public boolean hasParents() { + return false; + } + + @Override + public boolean isInstanceOf(Marker m) { + if (m!= null && this.getName().equals(m.getName())) { + return true; + } else { + return false; + } + } + + @Override + public boolean isInstanceOf(String name) { + return false; + } + + @Override + public boolean remove(Marker marker) { + return false; + } + + @Override + public Marker setParents(Marker... markers) { + return null; + } +} diff --git llap-server/src/main/resources/llap-daemon-log4j2.properties llap-server/src/main/resources/llap-daemon-log4j2.properties index 1a0387c..ffa0bf8 100644 --- llap-server/src/main/resources/llap-daemon-log4j2.properties +++ llap-server/src/main/resources/llap-daemon-log4j2.properties @@ -57,12 +57,15 @@ appender.RFA.strategy.max = ${sys:llap.daemon.log.maxbackupindex} appender.HISTORYAPPENDER.type = RollingRandomAccessFile appender.HISTORYAPPENDER.name = HISTORYAPPENDER appender.HISTORYAPPENDER.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.historylog.file} -appender.HISTORYAPPENDER.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.historylog.file}_%i +appender.HISTORYAPPENDER.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.historylog.file}_%d{yyyy-MM-dd}_%i appender.HISTORYAPPENDER.layout.type = PatternLayout appender.HISTORYAPPENDER.layout.pattern = %m%n appender.HISTORYAPPENDER.policies.type = Policies appender.HISTORYAPPENDER.policies.size.type = SizeBasedTriggeringPolicy appender.HISTORYAPPENDER.policies.size.size = ${sys:llap.daemon.log.maxfilesize} +appender.HISTORYAPPENDER.policies.time.type = TimeBasedTriggeringPolicy +appender.HISTORYAPPENDER.policies.time.interval = 1 +appender.HISTORYAPPENDER.policies.time.modulate = true appender.HISTORYAPPENDER.strategy.type = DefaultRolloverStrategy appender.HISTORYAPPENDER.strategy.max = ${sys:llap.daemon.log.maxbackupindex} @@ -71,34 +74,46 @@ appender.dag-routing.type = Routing appender.dag-routing.name = dag-routing appender.dag-routing.routes.type = Routes appender.dag-routing.routes.pattern = $${ctx:dagId} +#Purge polciy for dag-based Routing Appender +appender.dag-routing.purgePolicy.type = LlapRoutingAppenderPurgePolicy +# Note: Do not change this name without changing the corresponding entry in LlapConstants +appender.dag-routing.purgePolicy.name = llapLogPurgerDagRouting # default route appender.dag-routing.routes.route-default.type = Route appender.dag-routing.routes.route-default.key = $${ctx:dagId} appender.dag-routing.routes.route-default.ref = RFA # dagId based route appender.dag-routing.routes.route-mdc.type = Route -appender.dag-routing.routes.route-mdc.file-mdc.type = RandomAccessFile -appender.dag-routing.routes.route-mdc.file-mdc.name = file-mdc -appender.dag-routing.routes.route-mdc.file-mdc.fileName = ${sys:llap.daemon.log.dir}/${ctx:dagId}.log -appender.dag-routing.routes.route-mdc.file-mdc.layout.type = PatternLayout -appender.dag-routing.routes.route-mdc.file-mdc.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n +appender.dag-routing.routes.route-mdc.file-mdc.type = LlapWrappedAppender +appender.dag-routing.routes.route-mdc.file-mdc.name = IrrelevantName-dag-routing +appender.dag-routing.routes.route-mdc.file-mdc.app.type=RandomAccessFile +appender.dag-routing.routes.route-mdc.file-mdc.app.name=file-mdc +appender.dag-routing.routes.route-mdc.file-mdc.app.fileName = ${sys:llap.daemon.log.dir}/${ctx:dagId}.log +appender.dag-routing.routes.route-mdc.file-mdc.app.layout.type = PatternLayout +appender.dag-routing.routes.route-mdc.file-mdc.app.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n # queryId based routing file appender appender.query-routing.type = Routing appender.query-routing.name = query-routing appender.query-routing.routes.type = Routes appender.query-routing.routes.pattern = $${ctx:queryId} +#Purge polciy for query-based Routing Appender +appender.query-routing.purgePolicy.type = LlapRoutingAppenderPurgePolicy +# Note: Do not change this name without changing the corresponding entry in LlapConstants +appender.query-routing.purgePolicy.name = llapLogPurgerQueryRouting # default route appender.query-routing.routes.route-default.type = Route appender.query-routing.routes.route-default.key = $${ctx:queryId} appender.query-routing.routes.route-default.ref = RFA # queryId based route appender.query-routing.routes.route-mdc.type = Route -appender.query-routing.routes.route-mdc.file-mdc.type = RandomAccessFile -appender.query-routing.routes.route-mdc.file-mdc.name = file-mdc -appender.query-routing.routes.route-mdc.file-mdc.fileName = ${sys:llap.daemon.log.dir}/${ctx:queryId}.log -appender.query-routing.routes.route-mdc.file-mdc.layout.type = PatternLayout -appender.query-routing.routes.route-mdc.file-mdc.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n +appender.query-routing.routes.route-mdc.file-mdc.type = LlapWrappedAppender +appender.query-routing.routes.route-mdc.file-mdc.name = IrrelevantName-query-routing +appender.query-routing.routes.route-mdc.file-mdc.app.type = RandomAccessFile +appender.query-routing.routes.route-mdc.file-mdc.app.name = file-mdc +appender.query-routing.routes.route-mdc.file-mdc.app.fileName = ${sys:llap.daemon.log.dir}/${ctx:queryId}-${ctx:dagId}.log +appender.query-routing.routes.route-mdc.file-mdc.app.layout.type = PatternLayout +appender.query-routing.routes.route-mdc.file-mdc.app.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n # list of all loggers loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index cc2ca25..73bb68e 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -75,7 +75,8 @@ public static QueryFragmentInfo createQueryFragmentInfo( public static QueryInfo createQueryInfo() { QueryIdentifier queryIdentifier = new QueryIdentifier("fake_app_id_string", 1); QueryInfo queryInfo = - new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_name", 1, "fakeUser", + new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name", + "fakeHiveQueryId", 1, "fakeUser", new ConcurrentHashMap(), new String[0], null, "fakeUser", null); return queryInfo; diff --git pom.xml pom.xml index 464fbe4..c43e25f 100644 --- pom.xml +++ pom.xml @@ -162,7 +162,7 @@ 3.0.3 0.9.3 0.9.3 - 2.4.1 + 2.6.2 2.3 1.9.5 2.0.0-M5