diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 5366b9f..31390fa 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.log.LlapRoutingAppenderPurgePolicy; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.tez.common.CallableWithNdc; @@ -222,6 +223,10 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { ShuffleHandler.get().unregisterDag(localDir, queryInfo.getAppIdString(), queryInfo.getDagIdentifier()); } } + + // LlapRoutingAppenderPurgePolicy handles the case where it is not the configured instance. + //LlapRoutingAppenderPurgePolicy.getInstance(//TODO_APPENDER_NAME).keyComplete(//TODO_dagId); + // Clearing this before sending a kill is OK, since canFinish will change to false. // Ideally this should be a state machine where kills are issued to the executor, // and the structures are cleaned up once all tasks complete. New requests, however, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/log/LlapRoutingAppenderPurgePolicy.java llap-server/src/java/org/apache/hadoop/hive/llap/log/LlapRoutingAppenderPurgePolicy.java index e69de29..5350c63 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/log/LlapRoutingAppenderPurgePolicy.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/log/LlapRoutingAppenderPurgePolicy.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.log; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.google.common.base.Preconditions; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.routing.PurgePolicy; +import org.apache.logging.log4j.core.appender.routing.RoutingAppender; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.status.StatusLogger; + +/** + * A purge policy for the {@link RoutingAppender} which awaits a notification from the application + * about a key no longer being required, before it purges it. + */ +@Plugin(name = "LlapRoutingAppenderPurgePolicy", category = "Core", printObject = true) +public class LlapRoutingAppenderPurgePolicy implements PurgePolicy { + + private static final Logger LOGGER = StatusLogger.getLogger(); + + private static final ConcurrentMap INSTANCES = + new ConcurrentHashMap<>(); + + private static final LlapRoutingAppenderPurgePolicy NOOP_INSTANCE = + new LlapRoutingAppenderPurgePolicy() { + public void keyComplete(String key) { + } + }; + + private final Set knownAppenders = + Collections.newSetFromMap(new ConcurrentHashMap()); + + // The Routing appender, which manages underlying appenders + private RoutingAppender routingAppender; + + public LlapRoutingAppenderPurgePolicy(String name) { + LOGGER.trace("Created " + LlapRoutingAppenderPurgePolicy.class.getName() + " with name=" + name); + } + + private LlapRoutingAppenderPurgePolicy() { + this("_NOOP_"); + } + + @Override + public void initialize(RoutingAppender routingAppender) { + this.routingAppender = routingAppender; + } + + @Override + public void purge() { + // Nothing to do here. This is not invoked by the log4j framework. Should likely not be in + // the log4j interface + } + + @Override + public void update(String key, LogEvent event) { + knownAppenders.add(key); + } + + /** + * Indicate that the specified key is no longer used. + * @param key + */ + public void keyComplete(String key) { + Preconditions.checkNotNull(key, "Key must be specified"); + boolean removed = knownAppenders.remove(key); + if (removed) { + LOGGER.info("Deleting Appender for key: " + key); + routingAppender.deleteAppender(key); + } else { + LOGGER.trace("Ignoring call to remove unknown key: " + key); + } + } + + /** + * Returns a handle to the {@link LlapRoutingAppenderPurgePolicy} with the specified name + * + * @param name + * @return + */ + public static LlapRoutingAppenderPurgePolicy getInstance(String name) { + LlapRoutingAppenderPurgePolicy instance = INSTANCES.get(name); + if (instance == null) { + return NOOP_INSTANCE; + } else { + return instance; + } + } + + @PluginFactory + public static PurgePolicy createPurgePolicy( + @PluginAttribute("name") final String name) { + + // Name required for routing. Error out if it is not set. + Preconditions.checkNotNull(name, + "Name must be specified for " + LlapRoutingAppenderPurgePolicy.class.getName()); + LlapRoutingAppenderPurgePolicy llapRoutingAppenderPurgePolicy = + new LlapRoutingAppenderPurgePolicy(name); + LlapRoutingAppenderPurgePolicy old = INSTANCES.put(name, llapRoutingAppenderPurgePolicy); + if (old != null) { + LOGGER.error("Attempt to create multiple instances of " + + LlapRoutingAppenderPurgePolicy.class.getName() + " with the name " + name); + } + return llapRoutingAppenderPurgePolicy; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/log/LlapWrappedAppender.java llap-server/src/java/org/apache/hadoop/hive/llap/log/LlapWrappedAppender.java index e69de29..533ea5b 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/log/LlapWrappedAppender.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/log/LlapWrappedAppender.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.log; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.base.Preconditions; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.appender.RandomAccessFileAppender; +import org.apache.logging.log4j.core.config.AppenderControl; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.Node; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginConfiguration; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.config.plugins.PluginNode; +import org.apache.logging.log4j.util.Strings; + +/** + * An appender to wrap around RandomAccessFileAppender, and rename the file once the appender is closed. + * Could be potentially extended to other appenders by special casing them to figure out how files + * are to be handled. + */ +@Plugin(name = "LlapWrappedAppender", category = "Core", elementType = "appender", printObject = true, deferChildren = true) +public class LlapWrappedAppender extends AbstractAppender { + + private static final boolean DEFAULT_RENAME_FILES_ON_CLOSE = true; + private static final String DEFAULT_RENAMED_FILE_SUFFIX = ".done"; + + private final Node node; + private final Configuration config; + private final boolean renameFileOnClose; + private final String renamedFileSuffix; + + private AtomicReference realAppender = new AtomicReference<>(); + private AtomicReference appenderControl = new AtomicReference<>(); + + public LlapWrappedAppender(final String name, final Node node, final Configuration config, + boolean renameOnClose, String renamedFileSuffix) { + super(name, null, null); + this.node = node; + this.config = config; + this.renameFileOnClose = renameOnClose; + this.renamedFileSuffix = renamedFileSuffix; + LOGGER.debug( + LlapWrappedAppender.class.getName() + " created with name=" + name + ", renameOnClose=" + + renameOnClose + ", renamedFileSuffix=" + renamedFileSuffix); + } + + @Override + public void start() { + super.start(); + } + + @Override + public void append(LogEvent event) { + setupAppenderIfRequired(event); + if (appenderControl.get() != null) { + appenderControl.get().callAppender(event); + } + } + + private void setupAppenderIfRequired(LogEvent event) { + if (appenderControl.get() == null) { + if (node.getType().getElementName().equals("appender")) { + for (final Node cnode : node.getChildren()) { + final Node appNode = new Node(cnode); + config.createConfiguration(appNode, event); + if (appNode.getObject() instanceof Appender) { + final Appender app = appNode.getObject(); + app.start(); + Preconditions.checkArgument(app instanceof RandomAccessFileAppender, + LlapWrappedAppender.class.getName() + " only supports " + + RandomAccessFileAppender.class.getName() + ". Found appender name=" + + app.getName() + ", class=" + app.getClass().getName()); + realAppender.set(app); + appenderControl.set(new AppenderControl(app, null, null)); + break; + } + } + if (appenderControl.get() == null) { + // Fail if mis-configured. + throw new RuntimeException(LlapWrappedAppender.class.getSimpleName() + + "name=" + getName() + " unable to setup actual appender." + + "Could not find child appender"); + } + } else { + // Fail if mis-configured. + throw new RuntimeException(LlapWrappedAppender.class.getSimpleName() + + "name=" + getName() + " unable to setup actual appender." + + "Could not find child appender"); + } + } +// if (LOGGER.isInfoEnabled() && realAppender.get() != null) { +// RandomAccessFileAppender raf = (RandomAccessFileAppender) realAppender.get(); +// LOGGER.info("Writing to file: " + raf.getFileName(), +// "appenderName=" + raf.getName() + ", appenderManagerName=" + raf.getManager().getName()); +// } + } + + + @Override + public void stop() { + if (!(this.isStopping() || this.isStopped())) { + super.stop(); + if (appenderControl.get() != null) { + appenderControl.get().stop(); + realAppender.get().stop(); + } + + if (renameFileOnClose) { + RandomAccessFileAppender raf = (RandomAccessFileAppender) realAppender.get(); + raf.getFileName(); + try { + // TODO Change this logic to 1) not overwrite files, 2) generate indexed filenames to move to. + // The same file can be generated if accessed by an external service, where the query completion + // boundary is not well known. + Files.move(Paths.get(raf.getFileName()), Paths.get(raf.getFileName() + renamedFileSuffix), + StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + @PluginFactory + public static LlapWrappedAppender createAppender( + @PluginAttribute("name") final String name, // This isn't really used for anything. + @PluginAttribute("renameFileOnClose") final String renameFileOnCloseProvided, + @PluginAttribute("renamedFileSuffix") final String renamedFileSuffixProvided, + @PluginNode final Node node, + @PluginConfiguration final Configuration config + ) { + if (config == null) { + LOGGER.error("PluginConfiguration not expected to be null"); + return null; + } + if (node == null) { + LOGGER.error("Node must be specified as an appender specification"); + return null; + } + + boolean renameFileOnClose = DEFAULT_RENAME_FILES_ON_CLOSE; + if (Strings.isNotBlank(renameFileOnCloseProvided)) { + renameFileOnClose = Boolean.parseBoolean(renameFileOnCloseProvided); + } + String renamedFileSuffix = DEFAULT_RENAMED_FILE_SUFFIX; + if (Strings.isNotBlank(renamedFileSuffixProvided)) { + renamedFileSuffix = renamedFileSuffixProvided; + } + + return new LlapWrappedAppender(name, node, config, renameFileOnClose, renamedFileSuffix); + } + +} \ No newline at end of file diff --git llap-server/src/main/resources/llap-daemon-log4j2.properties llap-server/src/main/resources/llap-daemon-log4j2.properties index 24f8685..cf9affe 100644 --- llap-server/src/main/resources/llap-daemon-log4j2.properties +++ llap-server/src/main/resources/llap-daemon-log4j2.properties @@ -68,6 +68,8 @@ appender.routing.type = Routing appender.routing.name = routing appender.routing.routes.type = Routes appender.routing.routes.pattern = $${ctx:dagId} +appender.routing.purgePolicy.type = LlapRoutingAppenderPurgePolicy +appender.routing.purgePolicy.name = routingPurgePolicy # default route appender.routing.routes.route-default.type = Route @@ -87,11 +89,15 @@ appender.routing.routes.route-default.file-default.strategy.max = ${sys:llap.dae # mdc key based route appender.routing.routes.route-mdc.type = Route -appender.routing.routes.route-mdc.file-mdc.type = RandomAccessFile -appender.routing.routes.route-mdc.file-mdc.name = file-${ctx:dagId} -appender.routing.routes.route-mdc.file-mdc.fileName = ${sys:llap.daemon.log.dir}/${ctx:dagId}.log -appender.routing.routes.route-mdc.file-mdc.layout.type = PatternLayout -appender.routing.routes.route-mdc.file-mdc.layout.pattern = %d{ISO8601} %5p [%t] %c{2}: %m%n +appender.routing.routes.route-mdc.file-mdc.type = LlapWrappedAppender +appender.routing.routes.route-mdc.file-mdc.name = IrrelevantName +appender.routing.routes.route-mdc.file-mdc.renameFileOnClose=true +appender.routing.routes.route-mdc.file-mdc.renamedFileSuffix=.done +appender.routing.routes.route-mdc.file-mdc.appenderDef.type=RandomAccessFile +appender.routing.routes.route-mdc.file-mdc.appenderDef.name = file-${ctx:dagId} +appender.routing.routes.route-mdc.file-mdc.appenderDef.fileName = ${sys:llap.daemon.log.dir}/${ctx:dagId}.log +appender.routing.routes.route-mdc.file-mdc.appenderRef.layout.type = PatternLayout +appender.routing.routes.route-mdc.file-mdc.appenderRef.layout.pattern = %d{ISO8601} %5p [%t] %c{2}: %m%n # list of all loggers loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking diff --git pom.xml pom.xml index 464fbe4..66a78f6 100644 --- pom.xml +++ pom.xml @@ -162,7 +162,7 @@ 3.0.3 0.9.3 0.9.3 - 2.4.1 + 2.6.1 2.3 1.9.5 2.0.0-M5