diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 2f9dea0..ec67417 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -62,6 +62,7 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; +import org.apache.log4j.MDC; import org.apache.log4j.NDC; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; @@ -185,8 +186,16 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber()); // This is the start of container-annotated logging. - // TODO Reduce the length of this string. Way too verbose at the moment. - NDC.push(fragmentIdString); + final String dagId = attemptId.getTaskID().getVertexID().getDAGId().toString(); + final String queryId = vertex.getHiveQueryId(); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); + // TODO: Ideally we want tez to use CallableWithMdc that retains the MDC for threads created in + // thread pool. For now, we will push both dagId and queryId into NDC and the custom thread + // pool that we use for task execution and llap io (StatsRecordingThreadPool) will pop them + // using reflection and update the MDC. + NDC.push(dagId); + NDC.push(queryId); Scheduler.SubmissionState submissionState; SubmitWorkResponseProto.Builder responseBuilder = SubmitWorkResponseProto.newBuilder(); try { @@ -246,7 +255,8 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws metrics.incrExecutorTotalRequestsHandled(); } } finally { - NDC.pop(); + MDC.clear(); + NDC.clear(); } responseBuilder.setSubmissionState(SubmissionStateProto.valueOf(submissionState.name())); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index c7e9d32..91b8727 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -287,6 +287,8 @@ private void initializeLogging(final Configuration conf) { URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource(LOG4j2_PROPERTIES_FILE); if (llap_l4j2 != null) { final boolean async = LogUtils.checkAndSetAsyncLogging(conf); + // required for MDC based routing appender so that child threads can inherit the MDC context + System.setProperty("isThreadContextMapInheritable", "true"); Configurator.initialize("LlapDaemonLog4j2", llap_l4j2.toString()); long end = System.currentTimeMillis(); LOG.warn("LLAP daemon logging initialized from {} in {} ms. Async: {}", diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java index 9b3ce7e..b20779c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; +import java.lang.reflect.Field; import java.util.List; import java.util.Map; +import java.util.Stack; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; @@ -30,6 +32,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader; +import org.apache.log4j.MDC; +import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.runtime.task.TaskRunner2Callable; @@ -100,10 +104,40 @@ public V call() throws Exception { // clone thread local file system statistics List statsBefore = LlapUtil.cloneThreadLocalFileSystemStatistics(); + setupMDCFromNDC(actualCallable); try { return actualCallable.call(); } finally { updateFileSystemCounters(statsBefore, actualCallable); + MDC.clear(); + } + } + + private void setupMDCFromNDC(final Callable actualCallable) { + if (actualCallable instanceof CallableWithNdc) { + CallableWithNdc callableWithNdc = (CallableWithNdc) actualCallable; + try { + Field field = callableWithNdc.getClass().getSuperclass().getDeclaredField("ndcStack"); + field.setAccessible(true); + Stack ndcStack = (Stack) field.get(callableWithNdc); + final String queryId = (String) ndcStack.pop(); + final String dagId = (String) ndcStack.pop(); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); + ndcStack.push(dagId); + ndcStack.push(queryId); + if (LOG.isDebugEnabled()) { + LOG.debug("Received dagId: {} queryId: {} instanceType: {}", + dagId, queryId, actualCallable.getClass().getSimpleName()); + } + } catch (Exception e) { + LOG.warn("Not setting up MDC as NDC stack cannot be accessed reflectively for" + + " instance type: {} exception type: {}", + actualCallable.getClass().getSimpleName(), e.getClass().getSimpleName()); + } + } else { + LOG.warn("Not setting up MDC as unknown callable instance type received: {}", + actualCallable.getClass().getSimpleName()); } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index fb64f0b..47dec4f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -50,6 +50,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.log4j.MDC; +import org.apache.log4j.NDC; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.security.JobTokenIdentifier; @@ -166,6 +168,12 @@ public long getStartTime() { @Override protected TaskRunner2Result callInternal() throws Exception { + final String queryId = NDC.pop(); + final String dagId = NDC.pop(); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); + NDC.push(dagId); + NDC.push(queryId); isStarted.set(true); this.startTime = System.currentTimeMillis(); @@ -266,6 +274,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception { } } finally { IOContextMap.clearThreadAttempt(attemptId); + MDC.clear(); } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index f870435..d0a464b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; @@ -56,8 +57,10 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hive.common.util.HiveStringUtils; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; public class LlapInputFormat implements InputFormat, VectorizedInputFormatInterface, SelfDescribingInputFormatInterface, @@ -148,7 +151,11 @@ public LlapRecordReader( this.columnIds = includedCols; this.sarg = ConvertAstToSearchArg.createFromConf(job); this.columnNames = ColumnProjectionUtils.getReadColumnNames(job); - String fragmentId = LlapTezUtils.getFragmentId(job); + final String fragmentId = LlapTezUtils.getFragmentId(job); + final String dagId = LlapTezUtils.getDagId(job); + final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); TezCounters taskCounters = null; if (fragmentId != null) { taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId); @@ -298,6 +305,7 @@ public void close() throws IOException { LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged! feedback.stop(); rethrowErrorIfAny(); + MDC.clear(); } private void rethrowErrorIfAny() throws IOException { diff --git a/llap-server/src/main/resources/llap-daemon-log4j2.properties b/llap-server/src/main/resources/llap-daemon-log4j2.properties index c5166e3..24f8685 100644 --- a/llap-server/src/main/resources/llap-daemon-log4j2.properties +++ b/llap-server/src/main/resources/llap-daemon-log4j2.properties @@ -28,14 +28,14 @@ property.llap.daemon.log.maxfilesize = 256MB property.llap.daemon.log.maxbackupindex = 20 # list of all appenders -appenders = console, RFA, HISTORYAPPENDER +appenders = console, RFA, HISTORYAPPENDER, routing # console appender appender.console.type = Console appender.console.name = console appender.console.target = SYSTEM_ERR appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} [%t%x] %p %c{2} : %m%n +appender.console.layout.pattern = %d{ISO8601} %5p [%t%x] %c{2}: %m%n # rolling file appender appender.RFA.type = RollingRandomAccessFile @@ -43,7 +43,7 @@ appender.RFA.name = RFA appender.RFA.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file} appender.RFA.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%i appender.RFA.layout.type = PatternLayout -appender.RFA.layout.pattern = %d{ISO8601} %-5p [%t%x]: %c{2} (%F:%M(%L)) - %m%n +appender.RFA.layout.pattern = %d{ISO8601} %5p [%t%x] %c{2}: %m%n appender.RFA.policies.type = Policies appender.RFA.policies.size.type = SizeBasedTriggeringPolicy appender.RFA.policies.size.size = ${sys:llap.daemon.log.maxfilesize} @@ -63,6 +63,36 @@ appender.HISTORYAPPENDER.policies.size.size = ${sys:llap.daemon.log.maxfilesize} appender.HISTORYAPPENDER.strategy.type = DefaultRolloverStrategy appender.HISTORYAPPENDER.strategy.max = ${sys:llap.daemon.log.maxbackupindex} +# routing file appender +appender.routing.type = Routing +appender.routing.name = routing +appender.routing.routes.type = Routes +appender.routing.routes.pattern = $${ctx:dagId} + +# default route +appender.routing.routes.route-default.type = Route +appender.routing.routes.route-default.key = $${ctx:dagId} +appender.routing.routes.route-default.file-default.type = RollingRandomAccessFile +appender.routing.routes.route-default.file-default.name = file-default +appender.routing.routes.route-default.file-default.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file} +appender.routing.routes.route-default.file-default.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%d{MM-dd-yy-HH-mm} +appender.routing.routes.route-default.file-default.layout.type = PatternLayout +appender.routing.routes.route-default.file-default.layout.pattern = %d{ISO8601} %5p [%t] %c{2}: %m%n +appender.routing.routes.route-default.file-default.policies.type = Policies +appender.routing.routes.route-default.file-default.policies.time.type = TimeBasedTriggeringPolicy +appender.routing.routes.route-default.file-default.policies.time.interval = 1 +appender.routing.routes.route-default.file-default.policies.time.modulate = true +appender.routing.routes.route-default.file-default.strategy.type = DefaultRolloverStrategy +appender.routing.routes.route-default.file-default.strategy.max = ${sys:llap.daemon.log.maxbackupindex} + +# mdc key based route +appender.routing.routes.route-mdc.type = Route +appender.routing.routes.route-mdc.file-mdc.type = RandomAccessFile +appender.routing.routes.route-mdc.file-mdc.name = file-${ctx:dagId} +appender.routing.routes.route-mdc.file-mdc.fileName = ${sys:llap.daemon.log.dir}/${ctx:dagId}.log +appender.routing.routes.route-mdc.file-mdc.layout.type = PatternLayout +appender.routing.routes.route-mdc.file-mdc.layout.pattern = %d{ISO8601} %5p [%t] %c{2}: %m%n + # list of all loggers loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java index 776aef2..ab5cb30 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java @@ -14,24 +14,14 @@ package org.apache.hadoop.hive.llap.tezplugins; -import java.text.NumberFormat; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; -import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.input.MultiMRInput; -import com.google.common.base.Joiner; - @InterfaceAudience.Private public class LlapTezUtils { public static boolean isSourceOfInterest(String inputClassName) { @@ -40,6 +30,10 @@ public static boolean isSourceOfInterest(String inputClassName) { MultiMRInput.class.getName()) || inputClassName.equals(MRInput.class.getName())); } + public static String getDagId(final JobConf job) { + return MRInputHelpers.getDagIdString(job); + } + public static String getFragmentId(final JobConf job) { String taskAttemptId = MRInputHelpers.getTaskAttemptIdString(job); if (taskAttemptId != null) {