diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java new file mode 100644 index 0000000..3177501 --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java @@ -0,0 +1,148 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon; + +import org.apache.log4j.Logger; + +public class HistoryLogger { + + + private static final String HISTORY_EVENT_TYPE = "Event"; + private static final String HISTORY_APPLICATION_ID = "ApplicationId"; + private static final String HISTORY_CONTAINER_ID = "ContainerId"; + private static final String HISTORY_SUBMIT_TIME = "SubmitTime"; + private static final String HISTORY_START_TIME = "StartTime"; + private static final String HISTORY_END_TIME = "EndTime"; + private static final String HISTORY_DAG_NAME = "DagName"; + private static final String HISTORY_VERTEX_NAME = "VertexName"; + private static final String HISTORY_TASK_ID = "TaskId"; + private static final String HISTORY_ATTEMPT_ID = "TaskAttemptId"; + private static final String HISTORY_HOSTNAME = "HostName"; + private static final String HISTORY_SUCCEEDED = "Succeeded"; + + private static final String EVENT_TYPE_FRAGMENT_START = "FRAGMENT_START"; + private static final String EVENT_TYPE_FRAGMENT_END = "FRAGMENT_END"; + + private static final Logger HISTORY_LOGGER = Logger.getLogger(HistoryLogger.class); + + public static void logFragmentStart(String applicationIdStr, String containerIdStr, + String hostname, + String dagName, String vertexName, int taskId, + int attemptId) { + HISTORY_LOGGER.info( + constructFragmentStartString(applicationIdStr, containerIdStr, hostname, dagName, + vertexName, taskId, attemptId)); + } + + public static void logFragmentEnd(String applicationIdStr, String containerIdStr, String hostname, + String dagName, String vertexName, int taskId, int attemptId, + long startTime, boolean failed) { + HISTORY_LOGGER.info(constructFragmentEndString(applicationIdStr, containerIdStr, hostname, + dagName, vertexName, taskId, attemptId, startTime, failed)); + } + + + private static String constructFragmentStartString(String applicationIdStr, String containerIdStr, + String hostname, String dagName, + String vertexName, int taskId, int attemptId) { + HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_START); + lb.addHostName(hostname); + lb.addAppid(applicationIdStr); + lb.addContainerId(containerIdStr); + lb.addDagName(dagName); + lb.addVertexName(vertexName); + lb.addTaskId(taskId); + lb.addTaskAttemptId(attemptId); + lb.addTime(HISTORY_SUBMIT_TIME); + return lb.toString(); + } + + private static String constructFragmentEndString(String applicationIdStr, String containerIdStr, + String hostname, String dagName, + String vertexName, int taskId, int attemptId, + long startTime, boolean succeeded) { + HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_END); + lb.addHostName(hostname); + lb.addAppid(applicationIdStr); + lb.addContainerId(containerIdStr); + lb.addDagName(dagName); + lb.addVertexName(vertexName); + lb.addTaskId(taskId); + lb.addTaskAttemptId(attemptId); + lb.addSuccessStatus(succeeded); + lb.addTime(HISTORY_START_TIME, startTime); + lb.addTime(HISTORY_END_TIME); + return lb.toString(); + } + + private static class HistoryLineBuilder { + private final StringBuilder sb = new StringBuilder(); + + HistoryLineBuilder(String eventType) { + sb.append(HISTORY_EVENT_TYPE).append("=").append(eventType); + } + + HistoryLineBuilder addHostName(String hostname) { + return setKeyValue(HISTORY_HOSTNAME, hostname); + } + + HistoryLineBuilder addAppid(String appId) { + return setKeyValue(HISTORY_APPLICATION_ID, appId); + } + + HistoryLineBuilder addContainerId(String containerId) { + return setKeyValue(HISTORY_CONTAINER_ID, containerId); + } + + HistoryLineBuilder addDagName(String dagName) { + return setKeyValue(HISTORY_DAG_NAME, dagName); + } + + HistoryLineBuilder addVertexName(String vertexName) { + return setKeyValue(HISTORY_VERTEX_NAME, vertexName); + } + + HistoryLineBuilder addTaskId(int taskId) { + return setKeyValue(HISTORY_TASK_ID, String.valueOf(taskId)); + } + + HistoryLineBuilder addTaskAttemptId(int attemptId) { + return setKeyValue(HISTORY_ATTEMPT_ID, String.valueOf(attemptId)); + } + + HistoryLineBuilder addTime(String timeParam, long millis) { + return setKeyValue(timeParam, String.valueOf(millis)); + } + + HistoryLineBuilder addTime(String timeParam) { + return setKeyValue(timeParam, String.valueOf(System.currentTimeMillis())); + } + + HistoryLineBuilder addSuccessStatus(boolean status) { + return setKeyValue(HISTORY_SUCCEEDED, String.valueOf(status)); + } + + + private HistoryLineBuilder setKeyValue(String key, String value) { + sb.append(", ").append(key).append("=").append(value); + return this; + } + + @Override + public String toString() { + return sb.toString(); + } + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 360f63e..ed8cb2f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; @@ -60,6 +61,7 @@ private static final Logger LOG = Logger.getLogger(ContainerRunnerImpl.class); + private final int numExecutors; private final ListeningExecutorService executorService; private final AtomicReference localAddress; @@ -127,7 +129,10 @@ private static String createAppSpecificLocalDir(String baseDir, String applicati @Override public void queueContainer(RunContainerRequestProto request) throws IOException { - LOG.info("Queing container for execution: " + request); + // TODO HIVE-7926 Fill in additional details when known. + HistoryLogger.logFragmentStart(request.getApplicationIdString(), request.getContainerIdString(), + localAddress.get().getHostName(), null, null, -1, -1); + LOG.info("Queuing container for execution: " + request); Map env = new HashMap(); // TODO What else is required in this environment map. @@ -166,7 +171,7 @@ public void queueContainer(RunContainerRequestProto request) throws IOException ContainerRunnerCallable callable = new ContainerRunnerCallable(request, new Configuration(getConfig()), new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs, - workingDir, credentials, memoryPerExecutor); + workingDir, credentials, memoryPerExecutor, localAddress.get().getHostName()); ListenableFuture future = executorService .submit(callable); Futures.addCallback(future, new ContainerRunnerCallback(request, callable)); @@ -186,12 +191,14 @@ public void queueContainer(RunContainerRequestProto request) throws IOException private final Credentials credentials; private final long memoryAvailable; private volatile TezChild tezChild; + private final String localHostname; + private volatile long startTime; ContainerRunnerCallable(RunContainerRequestProto request, Configuration conf, ExecutionContext executionContext, Map envMap, String[] localDirs, String workingDir, Credentials credentials, - long memoryAvailable) { + long memoryAvailable, String localHostName) { this.request = request; this.conf = conf; this.executionContext = executionContext; @@ -201,11 +208,13 @@ public void queueContainer(RunContainerRequestProto request) throws IOException this.objectRegistry = new ObjectRegistryImpl(); this.credentials = credentials; this.memoryAvailable = memoryAvailable; + this.localHostname = localHostName; } @Override public ContainerExecutionResult call() throws Exception { + this.startTime = System.currentTimeMillis(); Stopwatch sw = new Stopwatch().start(); tezChild = new TezChild(conf, request.getAmHost(), request.getAmPort(), @@ -258,6 +267,9 @@ public void onSuccess(ContainerExecutionResult result) { request.getContainerIdString()); break; } + HistoryLogger + .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), + localAddress.get().getHostName(), null, null, -1, -1, containerRunnerCallable.startTime, true); } @Override @@ -269,6 +281,11 @@ public void onFailure(Throwable t) { if (tezChild != null) { tezChild.shutdown(); } + HistoryLogger + .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), + localAddress.get().getHostName(), null, null, -1, -1, + containerRunnerCallable.startTime, false); } } + } diff --git llap-server/src/test/resources/llap-daemon-log4j.properties llap-server/src/test/resources/llap-daemon-log4j.properties index 5a5063f..a5f57df 100644 --- llap-server/src/test/resources/llap-daemon-log4j.properties +++ llap-server/src/test/resources/llap-daemon-log4j.properties @@ -19,27 +19,43 @@ llap.daemon.root.logger=INFO,console llap.daemon.log.dir=. llap.daemon.log.file=llapdaemon.log +llap.daemon.historylog.file=llapdaemon_history.log +log4j.logger.org.apache.hadoop.hive.llap.daemon.HistoryLogger=INFO,HISTORYAPPENDER + # Define the root logger to the system property "llap.daemon.root.logger". log4j.rootLogger=${llap.daemon.root.logger} # Logging Threshold log4j.threshold=ALL + # Null Appender log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender -# + + +# History Events appender +log4j.appender.HISTORYAPPENDER=org.apache.log4j.RollingFileAppender +log4j.appender.HISTORYAPPENDER.File=${llap.daemon.log.dir}/${llap.daemon.historylog.file} +log4j.appender.HISTORYAPPENDER.MaxFileSize=${llap.daemon.log.maxfilesize} +log4j.appender.HISTORYAPPENDER.MaxBackupIndex=${llap.daemon.log.maxbackupindex} +log4j.appender.HISTORYAPPENDER.layout=org.apache.log4j.EnhancedPatternLayout +log4j.appender.HISTORYAPPENDER.layout.ConversionPattern=%m%n + + + # Rolling File Appender - cap space usage at 5gb. # llap.daemon.log.maxfilesize=256MB llap.daemon.log.maxbackupindex=20 log4j.appender.RFA=org.apache.log4j.RollingFileAppender log4j.appender.RFA.File=${llap.daemon.log.dir}/${llap.daemon.log.file} +log4j.appender.RFA.Append=true log4j.appender.RFA.MaxFileSize=${llap.daemon.log.maxfilesize} log4j.appender.RFA.MaxBackupIndex=${llap.daemon.log.maxbackupindex} -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout=org.apache.log4j.EnhancedPatternLayout # Pattern format: Date LogLevel LoggerName LogMessage log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %p %c: %m%n @@ -59,7 +75,7 @@ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd # 30-day backup #log4j.appender.DRFA.MaxBackupIndex=30 -log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout +log4j.appender.DRFA.layout=org.apache.log4j.EnhancedPatternLayout # Pattern format: Date LogLevel LoggerName LogMessage log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} [%t] %p %c: %m%n @@ -74,5 +90,5 @@ log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} [%t] %p %c: %m%n log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout=org.apache.log4j.EnhancedPatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t] %p %c{2} : %m%n