diff --git a/llap-server/bin/llap-daemon-env.sh b/llap-server/bin/llap-daemon-env.sh index 02c4315..7006d43 100755 --- a/llap-server/bin/llap-daemon-env.sh +++ b/llap-server/bin/llap-daemon-env.sh @@ -32,7 +32,7 @@ #export LLAP_DAEMON_USER_CLASSPATH= # Logger setup for LLAP daemon -#export LLAP_DAEMON_LOGGER=RFA +#export LLAP_DAEMON_LOGGER=dag-routing # Log level for LLAP daemon #export LLAP_DAEMON_LOG_LEVEL=INFO diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 2f9dea0..103115e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.security.LlapSignerImpl; import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -62,6 +63,7 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; +import org.apache.log4j.MDC; import org.apache.log4j.NDC; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; @@ -185,8 +187,19 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber()); // This is the start of container-annotated logging. - // TODO Reduce the length of this string. Way too verbose at the moment. - NDC.push(fragmentIdString); + final String dagId = attemptId.getTaskID().getVertexID().getDAGId().toString(); + final String queryId = vertex.getHiveQueryId(); + final String fragId = LlapTezUtils.stripAttemptPrefix(fragmentIdString); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); + MDC.put("fragmentId", fragId); + // TODO: Ideally we want tez to use CallableWithMdc that retains the MDC for threads created in + // thread pool. For now, we will push both dagId and queryId into NDC and the custom thread + // pool that we use for task execution and llap io (StatsRecordingThreadPool) will pop them + // using reflection and update the MDC. + NDC.push(dagId); + NDC.push(queryId); + NDC.push(fragId); Scheduler.SubmissionState submissionState; SubmitWorkResponseProto.Builder responseBuilder = SubmitWorkResponseProto.newBuilder(); try { @@ -246,7 +259,8 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws metrics.incrExecutorTotalRequestsHandled(); } } finally { - NDC.pop(); + MDC.clear(); + NDC.clear(); } responseBuilder.setSubmissionState(SubmissionStateProto.valueOf(submissionState.name())); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index c7e9d32..91b8727 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -287,6 +287,8 @@ private void initializeLogging(final Configuration conf) { URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource(LOG4j2_PROPERTIES_FILE); if (llap_l4j2 != null) { final boolean async = LogUtils.checkAndSetAsyncLogging(conf); + // required for MDC based routing appender so that child threads can inherit the MDC context + System.setProperty("isThreadContextMapInheritable", "true"); Configurator.initialize("LlapDaemonLog4j2", llap_l4j2.toString()); long end = System.currentTimeMillis(); LOG.warn("LLAP daemon logging initialized from {} in {} ms. Async: {}", diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java index 9b3ce7e..e4c37a3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; +import java.lang.reflect.Field; import java.util.List; import java.util.Map; +import java.util.Stack; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; @@ -30,6 +32,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader; +import org.apache.log4j.MDC; +import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.runtime.task.TaskRunner2Callable; @@ -100,10 +104,43 @@ public V call() throws Exception { // clone thread local file system statistics List statsBefore = LlapUtil.cloneThreadLocalFileSystemStatistics(); + setupMDCFromNDC(actualCallable); try { return actualCallable.call(); } finally { updateFileSystemCounters(statsBefore, actualCallable); + MDC.clear(); + } + } + + private void setupMDCFromNDC(final Callable actualCallable) { + if (actualCallable instanceof CallableWithNdc) { + CallableWithNdc callableWithNdc = (CallableWithNdc) actualCallable; + try { + Field field = callableWithNdc.getClass().getSuperclass().getDeclaredField("ndcStack"); + field.setAccessible(true); + Stack ndcStack = (Stack) field.get(callableWithNdc); + final String fragmentId = (String) ndcStack.pop(); + final String queryId = (String) ndcStack.pop(); + final String dagId = (String) ndcStack.pop(); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); + MDC.put("fragmentId", fragmentId); + ndcStack.push(dagId); + ndcStack.push(queryId); + ndcStack.push(fragmentId); + if (LOG.isDebugEnabled()) { + LOG.debug("Received dagId: {} queryId: {} instanceType: {}", + dagId, queryId, actualCallable.getClass().getSimpleName()); + } + } catch (Exception e) { + LOG.warn("Not setting up MDC as NDC stack cannot be accessed reflectively for" + + " instance type: {} exception type: {}", + actualCallable.getClass().getSimpleName(), e.getClass().getSimpleName()); + } + } else { + LOG.warn("Not setting up MDC as unknown callable instance type received: {}", + actualCallable.getClass().getSimpleName()); } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index fb64f0b..760d160 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -50,6 +50,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.log4j.MDC; +import org.apache.log4j.NDC; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.security.JobTokenIdentifier; @@ -166,6 +168,15 @@ public long getStartTime() { @Override protected TaskRunner2Result callInternal() throws Exception { + final String fragId = NDC.pop(); + final String queryId = NDC.pop(); + final String dagId = NDC.pop(); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); + MDC.put("fragmentId", fragId); + NDC.push(dagId); + NDC.push(queryId); + NDC.push(fragId); isStarted.set(true); this.startTime = System.currentTimeMillis(); @@ -266,6 +277,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception { } } finally { IOContextMap.clearThreadAttempt(attemptId); + MDC.clear(); } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index cc4e10b..c5d0680 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; @@ -84,8 +85,10 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hive.common.util.HiveStringUtils; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; public class LlapInputFormat implements InputFormat, VectorizedInputFormatInterface, SelfDescribingInputFormatInterface, @@ -189,9 +192,14 @@ public LlapRecordReader(JobConf job, FileSplit split, List includedCols this.columnIds = includedCols; this.sarg = ConvertAstToSearchArg.createFromConf(job); this.columnNames = ColumnProjectionUtils.getReadColumnNames(job); - String fragmentId = LlapTezUtils.getFragmentId(job); + final String fragmentId = LlapTezUtils.getFragmentId(job); + final String dagId = LlapTezUtils.getDagId(job); + final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); TezCounters taskCounters = null; if (fragmentId != null) { + MDC.put("fragmentId", fragmentId); taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId); LOG.info("Received fragment id: {}", fragmentId); } else { @@ -341,6 +349,7 @@ public void close() throws IOException { LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged! feedback.stop(); rethrowErrorIfAny(); + MDC.clear(); } private void rethrowErrorIfAny() throws IOException { diff --git a/llap-server/src/main/resources/llap-daemon-log4j2.properties b/llap-server/src/main/resources/llap-daemon-log4j2.properties index 7488ba2..7254931 100644 --- a/llap-server/src/main/resources/llap-daemon-log4j2.properties +++ b/llap-server/src/main/resources/llap-daemon-log4j2.properties @@ -28,14 +28,14 @@ property.llap.daemon.log.maxfilesize = 256MB property.llap.daemon.log.maxbackupindex = 20 # list of all appenders -appenders = console, RFA, HISTORYAPPENDER +appenders = console, RFA, HISTORYAPPENDER, dag-routing, query-routing # console appender appender.console.type = Console appender.console.name = console appender.console.target = SYSTEM_ERR appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %d{ISO8601} %5p [%t%x] %c{2}: %m%n +appender.console.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n # rolling file appender appender.RFA.type = RollingRandomAccessFile @@ -43,7 +43,7 @@ appender.RFA.name = RFA appender.RFA.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file} appender.RFA.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%i appender.RFA.layout.type = PatternLayout -appender.RFA.layout.pattern = %d{ISO8601} %5p [%t%x] %c{2}: %m%n +appender.RFA.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n appender.RFA.policies.type = Policies appender.RFA.policies.size.type = SizeBasedTriggeringPolicy appender.RFA.policies.size.size = ${sys:llap.daemon.log.maxfilesize} @@ -63,6 +63,62 @@ appender.HISTORYAPPENDER.policies.size.size = ${sys:llap.daemon.log.maxfilesize} appender.HISTORYAPPENDER.strategy.type = DefaultRolloverStrategy appender.HISTORYAPPENDER.strategy.max = ${sys:llap.daemon.log.maxbackupindex} +# dagId based routing file appender +appender.dag-routing.type = Routing +appender.dag-routing.name = dag-routing +appender.dag-routing.routes.type = Routes +appender.dag-routing.routes.pattern = $${ctx:dagId} +# default route +appender.dag-routing.routes.route-default.type = Route +appender.dag-routing.routes.route-default.key = $${ctx:dagId} +appender.dag-routing.routes.route-default.file-default.type = RollingRandomAccessFile +appender.dag-routing.routes.route-default.file-default.name = file-default +appender.dag-routing.routes.route-default.file-default.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file} +appender.dag-routing.routes.route-default.file-default.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%d{MM-dd-yy-HH-mm} +appender.dag-routing.routes.route-default.file-default.layout.type = PatternLayout +appender.dag-routing.routes.route-default.file-default.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n +appender.dag-routing.routes.route-default.file-default.policies.type = Policies +appender.dag-routing.routes.route-default.file-default.policies.time.type = TimeBasedTriggeringPolicy +appender.dag-routing.routes.route-default.file-default.policies.time.interval = 1 +appender.dag-routing.routes.route-default.file-default.policies.time.modulate = true +appender.dag-routing.routes.route-default.file-default.strategy.type = DefaultRolloverStrategy +appender.dag-routing.routes.route-default.file-default.strategy.max = ${sys:llap.daemon.log.maxbackupindex} +# dagId based route +appender.dag-routing.routes.route-mdc.type = Route +appender.dag-routing.routes.route-mdc.file-mdc.type = RandomAccessFile +appender.dag-routing.routes.route-mdc.file-mdc.name = file-mdc +appender.dag-routing.routes.route-mdc.file-mdc.fileName = ${sys:llap.daemon.log.dir}/${ctx:dagId}.log +appender.dag-routing.routes.route-mdc.file-mdc.layout.type = PatternLayout +appender.dag-routing.routes.route-mdc.file-mdc.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n + +# queryId based routing file appender +appender.query-routing.type = Routing +appender.query-routing.name = query-routing +appender.query-routing.routes.type = Routes +appender.query-routing.routes.pattern = $${ctx:queryId} +# default route +appender.query-routing.routes.route-default.type = Route +appender.query-routing.routes.route-default.key = $${ctx:queryId} +appender.query-routing.routes.route-default.file-default.type = RollingRandomAccessFile +appender.query-routing.routes.route-default.file-default.name = file-default +appender.query-routing.routes.route-default.file-default.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file} +appender.query-routing.routes.route-default.file-default.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%d{MM-dd-yy-HH-mm} +appender.query-routing.routes.route-default.file-default.layout.type = PatternLayout +appender.query-routing.routes.route-default.file-default.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n +appender.query-routing.routes.route-default.file-default.policies.type = Policies +appender.query-routing.routes.route-default.file-default.policies.time.type = TimeBasedTriggeringPolicy +appender.query-routing.routes.route-default.file-default.policies.time.interval = 1 +appender.query-routing.routes.route-default.file-default.policies.time.modulate = true +appender.query-routing.routes.route-default.file-default.strategy.type = DefaultRolloverStrategy +appender.query-routing.routes.route-default.file-default.strategy.max = ${sys:llap.daemon.log.maxbackupindex} +# queryId based route +appender.query-routing.routes.route-mdc.type = Route +appender.query-routing.routes.route-mdc.file-mdc.type = RandomAccessFile +appender.query-routing.routes.route-mdc.file-mdc.name = file-mdc +appender.query-routing.routes.route-mdc.file-mdc.fileName = ${sys:llap.daemon.log.dir}/${ctx:queryId}.log +appender.query-routing.routes.route-mdc.file-mdc.layout.type = PatternLayout +appender.query-routing.routes.route-mdc.file-mdc.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n + # list of all loggers loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking diff --git a/llap-server/src/main/resources/package.py b/llap-server/src/main/resources/package.py index 83fe918..45ae50e 100644 --- a/llap-server/src/main/resources/package.py +++ b/llap-server/src/main/resources/package.py @@ -71,6 +71,7 @@ def main(args): parser.add_argument("--args", default="") parser.add_argument("--name", default="llap0") parser.add_argument("--loglevel", default="INFO") + parser.add_argument("--logger", default="dag-routing") parser.add_argument("--chaosmonkey", type=int, default=0) parser.add_argument("--slider-am-container-mb", type=int, default=1024) parser.add_argument("--slider-keytab-dir", default="") @@ -120,6 +121,7 @@ def main(args): "name" : resource.clusterName, "daemon_args" : daemon_args, "daemon_loglevel" : args.loglevel, + "daemon_logger" : args.logger, "queue.string" : resource.queueString, "monkey_interval" : args.chaosmonkey, "monkey_percentage" : monkey_percentage, diff --git a/llap-server/src/main/resources/templates.py b/llap-server/src/main/resources/templates.py index 8baa927..79285a2 100644 --- a/llap-server/src/main/resources/templates.py +++ b/llap-server/src/main/resources/templates.py @@ -74,7 +74,7 @@ "site.global.app_user": "yarn", "site.global.app_root": "${AGENT_WORK_ROOT}/app/install/", "site.global.app_tmp_dir": "${AGENT_WORK_ROOT}/tmp/", - "site.global.app_logger": "RFA", + "site.global.app_logger": "%(daemon_logger)", "site.global.app_log_level": "%(daemon_loglevel)s", "site.global.additional_cp": "%(hadoop_home)s", "site.global.daemon_args": "%(daemon_args)s", diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java index eda8862..d44d9cb 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java @@ -14,24 +14,14 @@ package org.apache.hadoop.hive.llap.tezplugins; -import java.text.NumberFormat; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; -import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.input.MultiMRInput; -import com.google.common.base.Joiner; - @InterfaceAudience.Private public class LlapTezUtils { public static boolean isSourceOfInterest(String inputClassName) { @@ -40,6 +30,10 @@ public static boolean isSourceOfInterest(String inputClassName) { MultiMRInput.class.getName()) || inputClassName.equals(MRInput.class.getName())); } + public static String getDagId(final JobConf job) { + return MRInputHelpers.getDagIdString(job); + } + public static String getFragmentId(final JobConf job) { String taskAttemptId = job.get(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID); if (taskAttemptId != null) {