diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 5731b2c..e7591af 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -278,6 +278,8 @@ private void initializeLogging(final Configuration conf) { URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource(LOG4j2_PROPERTIES_FILE); if (llap_l4j2 != null) { final boolean async = LogUtils.checkAndSetAsyncLogging(conf); + // required for MDC based routing appender so that child threads can inherit the MDC context + System.setProperty("isThreadContextMapInheritable", "true"); Configurator.initialize("LlapDaemonLog4j2", llap_l4j2.toString()); long end = System.currentTimeMillis(); LOG.warn("LLAP daemon logging initialized from {} in {} ms. Async: {}", diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 74359fa..aee28dd 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -66,6 +66,7 @@ import org.apache.tez.runtime.task.TezTaskRunner2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -164,8 +165,14 @@ protected TaskRunner2Result callInternal() throws Exception { this.startTime = System.currentTimeMillis(); this.threadName = Thread.currentThread().getName(); + final String appIdString = request.getWorkSpec() + .getVertex() + .getVertexIdentifier() + .getApplicationIdString(); + updateMDC(taskSpec, appIdString); if (LOG.isDebugEnabled()) { - LOG.debug("canFinish: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); + LOG.debug("dagId:" + taskSpec.getDagIdentifier() + "canFinish: " + taskSpec.getTaskAttemptID() + + ": " + canFinish()); } // Unregister from the AMReporter, since the task is now running. @@ -260,10 +267,20 @@ public LlapTaskUmbilicalProtocol run() throws Exception { } } } finally { + clearMDC(); IOContextMap.clearThreadAttempt(attemptId); } } + private void updateMDC(final TaskSpec taskSpec, final String appId) { + final String dagId = "_dag_id_" + taskSpec.getDagIdentifier(); + MDC.put("dagId", appId + dagId); + } + + private void clearMDC() { + MDC.clear(); + } + /** * Attempt to kill a running task. If the task has not started running, it will not start. * If it's already running, a kill request will be sent to it. diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index 298f788..f3ca8c8 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -54,8 +54,10 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hive.common.util.HiveStringUtils; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -154,6 +156,8 @@ public LlapRecordReader( this.sarg = ConvertAstToSearchArg.createFromConf(job); this.columnNames = ColumnProjectionUtils.getReadColumnNames(job); String dagId = job.get("tez.mapreduce.dag.index"); + String appId = job.get("tez.mapreduce.application.id"); + updateMDC(dagId, appId); String vertexId = job.get("tez.mapreduce.vertex.index"); String taskId = job.get("tez.mapreduce.task.index"); String taskAttemptId = job.get("tez.mapreduce.task.attempt.index"); @@ -311,6 +315,15 @@ public void close() throws IOException { LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged! feedback.stop(); rethrowErrorIfAny(); + clearMDC(); + } + + private void updateMDC(final String dagId, final String appId) { + MDC.put("dagId", appId + "_dag_id_" + dagId); + } + + private void clearMDC() { + MDC.clear(); } private void rethrowErrorIfAny() throws IOException { diff --git a/llap-server/src/main/resources/llap-daemon-log4j2.properties b/llap-server/src/main/resources/llap-daemon-log4j2.properties index c5166e3..751386d 100644 --- a/llap-server/src/main/resources/llap-daemon-log4j2.properties +++ b/llap-server/src/main/resources/llap-daemon-log4j2.properties @@ -28,7 +28,7 @@ property.llap.daemon.log.maxfilesize = 256MB property.llap.daemon.log.maxbackupindex = 20 # list of all appenders -appenders = console, RFA, HISTORYAPPENDER +appenders = console, RFA, HISTORYAPPENDER, routing # console appender appender.console.type = Console @@ -63,6 +63,36 @@ appender.HISTORYAPPENDER.policies.size.size = ${sys:llap.daemon.log.maxfilesize} appender.HISTORYAPPENDER.strategy.type = DefaultRolloverStrategy appender.HISTORYAPPENDER.strategy.max = ${sys:llap.daemon.log.maxbackupindex} +# routing file appender +appender.routing.type = Routing +appender.routing.name = routing +appender.routing.routes.type = Routes +appender.routing.routes.pattern = $${ctx:dagId} + +# default route +appender.routing.routes.route-default.type = Route +appender.routing.routes.route-default.key = $${ctx:dagId} +appender.routing.routes.route-default.file-default.type = RollingRandomAccessFile +appender.routing.routes.route-default.file-default.name = file-default +appender.routing.routes.route-default.file-default.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file} +appender.routing.routes.route-default.file-default.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%d{MM-dd-yy-HH-mm} +appender.routing.routes.route-default.file-default.layout.type = PatternLayout +appender.routing.routes.route-default.file-default.layout.pattern = %d{ISO8601} %-5p [%t%X]: %c{2} (%F:%M(%L)) - %m%n +appender.routing.routes.route-default.file-default.policies.type = Policies +appender.routing.routes.route-default.file-default.policies.time.type = TimeBasedTriggeringPolicy +appender.routing.routes.route-default.file-default.policies.time.interval = 1 +appender.routing.routes.route-default.file-default.policies.time.modulate = true +appender.routing.routes.route-default.file-default.strategy.type = DefaultRolloverStrategy +appender.routing.routes.route-default.file-default.strategy.max = ${sys:llap.daemon.log.maxbackupindex} + +# mdc key based route +appender.routing.routes.route-mdc.type = Route +appender.routing.routes.route-mdc.file-mdc.type = RandomAccessFile +appender.routing.routes.route-mdc.file-mdc.name = file-${ctx:dagId} +appender.routing.routes.route-mdc.file-mdc.fileName = ${sys:llap.daemon.log.dir}/${ctx:dagId}.log +appender.routing.routes.route-mdc.file-mdc.layout.type = PatternLayout +appender.routing.routes.route-mdc.file-mdc.layout.pattern = %d{ISO8601} %-5p [%t%X]: %c{2} (%F:%M(%L)) - %m%n + # list of all loggers loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking