diff --git a/llap-server/bin/llap-daemon-env.sh b/llap-server/bin/llap-daemon-env.sh index 02c4315..14cab3d 100755 --- a/llap-server/bin/llap-daemon-env.sh +++ b/llap-server/bin/llap-daemon-env.sh @@ -32,7 +32,7 @@ #export LLAP_DAEMON_USER_CLASSPATH= # Logger setup for LLAP daemon -#export LLAP_DAEMON_LOGGER=RFA +#export LLAP_DAEMON_LOGGER=query-routing # Log level for LLAP daemon #export LLAP_DAEMON_LOG_LEVEL=INFO diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 2f9dea0..103115e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.security.LlapSignerImpl; import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -62,6 +63,7 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; +import org.apache.log4j.MDC; import org.apache.log4j.NDC; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; @@ -185,8 +187,19 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber()); // This is the start of container-annotated logging. - // TODO Reduce the length of this string. Way too verbose at the moment. - NDC.push(fragmentIdString); + final String dagId = attemptId.getTaskID().getVertexID().getDAGId().toString(); + final String queryId = vertex.getHiveQueryId(); + final String fragId = LlapTezUtils.stripAttemptPrefix(fragmentIdString); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); + MDC.put("fragmentId", fragId); + // TODO: Ideally we want tez to use CallableWithMdc that retains the MDC for threads created in + // thread pool. For now, we will push both dagId and queryId into NDC and the custom thread + // pool that we use for task execution and llap io (StatsRecordingThreadPool) will pop them + // using reflection and update the MDC. + NDC.push(dagId); + NDC.push(queryId); + NDC.push(fragId); Scheduler.SubmissionState submissionState; SubmitWorkResponseProto.Builder responseBuilder = SubmitWorkResponseProto.newBuilder(); try { @@ -246,7 +259,8 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws metrics.incrExecutorTotalRequestsHandled(); } } finally { - NDC.pop(); + MDC.clear(); + NDC.clear(); } responseBuilder.setSubmissionState(SubmissionStateProto.valueOf(submissionState.name())); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index c7e9d32..91b8727 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -287,6 +287,8 @@ private void initializeLogging(final Configuration conf) { URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource(LOG4j2_PROPERTIES_FILE); if (llap_l4j2 != null) { final boolean async = LogUtils.checkAndSetAsyncLogging(conf); + // required for MDC based routing appender so that child threads can inherit the MDC context + System.setProperty("isThreadContextMapInheritable", "true"); Configurator.initialize("LlapDaemonLog4j2", llap_l4j2.toString()); long end = System.currentTimeMillis(); LOG.warn("LLAP daemon logging initialized from {} in {} ms. Async: {}", diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java index 9b3ce7e..363b9b1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; +import java.lang.reflect.Field; import java.util.List; import java.util.Map; +import java.util.Stack; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; @@ -30,6 +32,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader; +import org.apache.log4j.MDC; +import org.apache.log4j.NDC; +import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.runtime.task.TaskRunner2Callable; @@ -100,10 +105,46 @@ public V call() throws Exception { // clone thread local file system statistics List statsBefore = LlapUtil.cloneThreadLocalFileSystemStatistics(); + setupMDCFromNDC(actualCallable); try { return actualCallable.call(); } finally { updateFileSystemCounters(statsBefore, actualCallable); + MDC.clear(); + } + } + + private void setupMDCFromNDC(final Callable actualCallable) { + if (actualCallable instanceof CallableWithNdc) { + CallableWithNdc callableWithNdc = (CallableWithNdc) actualCallable; + try { + // CallableWithNdc inherits from NDC only when call() is invoked. CallableWithNdc has to + // extended to provide access to its ndcStack that is cloned during creation. Until, then + // we will use reflection to access the private field. + // FIXME: HIVE-14243 follow to remove this reflection + Field field = callableWithNdc.getClass().getSuperclass().getDeclaredField("ndcStack"); + field.setAccessible(true); + Stack ndcStack = (Stack) field.get(callableWithNdc); + + final Stack clonedStack = (Stack) ndcStack.clone(); + final String fragmentId = (String) clonedStack.pop(); + final String queryId = (String) clonedStack.pop(); + final String dagId = (String) clonedStack.pop(); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); + MDC.put("fragmentId", fragmentId); + if (LOG.isDebugEnabled()) { + LOG.debug("Received dagId: {} queryId: {} instanceType: {}", + dagId, queryId, actualCallable.getClass().getSimpleName()); + } + } catch (Exception e) { + LOG.warn("Not setting up MDC as NDC stack cannot be accessed reflectively for" + + " instance type: {} exception type: {}", + actualCallable.getClass().getSimpleName(), e.getClass().getSimpleName()); + } + } else { + LOG.warn("Not setting up MDC as unknown callable instance type received: {}", + actualCallable.getClass().getSimpleName()); } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index fb64f0b..87bd5c8 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -22,6 +22,7 @@ import java.security.PrivilegedExceptionAction; import java.util.HashMap; import java.util.Map; +import java.util.Stack; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -50,6 +51,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.log4j.MDC; +import org.apache.log4j.NDC; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.security.JobTokenIdentifier; @@ -166,109 +169,126 @@ public long getStartTime() { @Override protected TaskRunner2Result callInternal() throws Exception { - isStarted.set(true); + setMDCFromNDC(); - this.startTime = System.currentTimeMillis(); - this.threadName = Thread.currentThread().getName(); - if (LOG.isDebugEnabled()) { - LOG.debug("canFinish: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); - } - - // Unregister from the AMReporter, since the task is now running. - this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort()); + try { + isStarted.set(true); - synchronized (this) { - if (!shouldRunTask) { - LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID()); - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); + this.startTime = System.currentTimeMillis(); + this.threadName = Thread.currentThread().getName(); + if (LOG.isDebugEnabled()) { + LOG.debug("canFinish: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); } - } - // TODO This executor seems unnecessary. Here and TezChild - executor = new StatsRecordingThreadPool(1, 1, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(), - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("TezTaskRunner") - .build()); - - // TODO Consolidate this code with TezChild. - runtimeWatch.start(); - if (taskUgi == null) { - taskUgi = UserGroupInformation.createRemoteUser(vertex.getUser()); - } - taskUgi.addCredentials(credentials); - - Map serviceConsumerMetadata = new HashMap<>(); - serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, - TezCommonUtils.convertJobTokenToBytes(jobToken)); - Multimap startedInputsMap = createStartedInputMap(vertex); - - UserGroupInformation taskOwner = - UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier()); - final InetSocketAddress address = - NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort()); - SecurityUtil.setTokenService(jobToken, address); - taskOwner.addToken(jobToken); - umbilical = taskOwner.doAs(new PrivilegedExceptionAction() { - @Override - public LlapTaskUmbilicalProtocol run() throws Exception { - return RPC.getProxy(LlapTaskUmbilicalProtocol.class, - LlapTaskUmbilicalProtocol.versionID, address, conf); - } - }); - - String fragmentId = LlapTezUtils.stripAttemptPrefix(taskSpec.getTaskAttemptID().toString()); - taskReporter = new LlapTaskReporter( - umbilical, - confParams.amHeartbeatIntervalMsMax, - confParams.amCounterHeartbeatInterval, - confParams.amMaxEventsPerHeartbeat, - new AtomicLong(0), - request.getContainerIdString(), - fragmentId, - initialEvent); - - String attemptId = fragmentInfo.getFragmentIdentifierString(); - IOContextMap.setThreadAttemptId(attemptId); - try { + // Unregister from the AMReporter, since the task is now running. + this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort()); + synchronized (this) { - if (shouldRunTask) { - taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(), - taskSpec, - vertex.getQueryIdentifier().getAppAttemptNumber(), - serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, - objectRegistry, - pid, - executionContext, memoryAvailable, false, tezHadoopShim); + if (!shouldRunTask) { + LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID()); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); } } - if (taskRunner == null) { - LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID()); - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); - } + // TODO This executor seems unnecessary. Here and TezChild + executor = new StatsRecordingThreadPool(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("TezTaskRunner") + .build()); + + // TODO Consolidate this code with TezChild. + runtimeWatch.start(); + if (taskUgi == null) { + taskUgi = UserGroupInformation.createRemoteUser(vertex.getUser()); + } + taskUgi.addCredentials(credentials); + + Map serviceConsumerMetadata = new HashMap<>(); + serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, + TezCommonUtils.convertJobTokenToBytes(jobToken)); + Multimap startedInputsMap = createStartedInputMap(vertex); + + UserGroupInformation taskOwner = + UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier()); + final InetSocketAddress address = + NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort()); + SecurityUtil.setTokenService(jobToken, address); + taskOwner.addToken(jobToken); + umbilical = taskOwner.doAs(new PrivilegedExceptionAction() { + @Override + public LlapTaskUmbilicalProtocol run() throws Exception { + return RPC.getProxy(LlapTaskUmbilicalProtocol.class, + LlapTaskUmbilicalProtocol.versionID, address, conf); + } + }); + + String fragmentId = LlapTezUtils.stripAttemptPrefix(taskSpec.getTaskAttemptID().toString()); + taskReporter = new LlapTaskReporter( + umbilical, + confParams.amHeartbeatIntervalMsMax, + confParams.amCounterHeartbeatInterval, + confParams.amMaxEventsPerHeartbeat, + new AtomicLong(0), + request.getContainerIdString(), + fragmentId, + initialEvent); + + String attemptId = fragmentInfo.getFragmentIdentifierString(); + IOContextMap.setThreadAttemptId(attemptId); try { - TaskRunner2Result result = taskRunner.run(); - if (result.isContainerShutdownRequested()) { - LOG.warn("Unexpected container shutdown requested while running task. Ignoring"); + synchronized (this) { + if (shouldRunTask) { + taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(), + taskSpec, + vertex.getQueryIdentifier().getAppAttemptNumber(), + serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, + objectRegistry, + pid, + executionContext, memoryAvailable, false, tezHadoopShim); + } } - isCompleted.set(true); - return result; - } finally { - FileSystem.closeAllForUGI(taskUgi); - LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + - runtimeWatch.stop().elapsedMillis()); - if (LOG.isDebugEnabled()) { - LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); + if (taskRunner == null) { + LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID()); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); + } + + try { + TaskRunner2Result result = taskRunner.run(); + if (result.isContainerShutdownRequested()) { + LOG.warn("Unexpected container shutdown requested while running task. Ignoring"); + } + isCompleted.set(true); + return result; + } finally { + FileSystem.closeAllForUGI(taskUgi); + LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + + runtimeWatch.stop().elapsedMillis()); + if (LOG.isDebugEnabled()) { + LOG.debug( + "canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); + } } + } finally { + IOContextMap.clearThreadAttempt(attemptId); } } finally { - IOContextMap.clearThreadAttempt(attemptId); + MDC.clear(); } } + private void setMDCFromNDC() { + final Stack clonedNDC = NDC.cloneStack(); + final String fragId = clonedNDC.pop(); + final String queryId = clonedNDC.pop(); + final String dagId = clonedNDC.pop(); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); + MDC.put("fragmentId", fragId); + } + /** * Attempt to kill a running task. If the task has not started running, it will not start. * If it's already running, a kill request will be sent to it. diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index cc4e10b..c5d0680 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; @@ -84,8 +85,10 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hive.common.util.HiveStringUtils; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; public class LlapInputFormat implements InputFormat, VectorizedInputFormatInterface, SelfDescribingInputFormatInterface, @@ -189,9 +192,14 @@ public LlapRecordReader(JobConf job, FileSplit split, List includedCols this.columnIds = includedCols; this.sarg = ConvertAstToSearchArg.createFromConf(job); this.columnNames = ColumnProjectionUtils.getReadColumnNames(job); - String fragmentId = LlapTezUtils.getFragmentId(job); + final String fragmentId = LlapTezUtils.getFragmentId(job); + final String dagId = LlapTezUtils.getDagId(job); + final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); TezCounters taskCounters = null; if (fragmentId != null) { + MDC.put("fragmentId", fragmentId); taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId); LOG.info("Received fragment id: {}", fragmentId); } else { @@ -341,6 +349,7 @@ public void close() throws IOException { LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged! feedback.stop(); rethrowErrorIfAny(); + MDC.clear(); } private void rethrowErrorIfAny() throws IOException { diff --git a/llap-server/src/main/resources/llap-daemon-log4j2.properties b/llap-server/src/main/resources/llap-daemon-log4j2.properties index 7488ba2..7254931 100644 --- a/llap-server/src/main/resources/llap-daemon-log4j2.properties +++ b/llap-server/src/main/resources/llap-daemon-log4j2.properties @@ -28,14 +28,14 @@ property.llap.daemon.log.maxfilesize = 256MB property.llap.daemon.log.maxbackupindex = 20 # list of all appenders -appenders = console, RFA, HISTORYAPPENDER +appenders = console, RFA, HISTORYAPPENDER, dag-routing, query-routing # console appender appender.console.type = Console appender.console.name = console appender.console.target = SYSTEM_ERR appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %d{ISO8601} %5p [%t%x] %c{2}: %m%n +appender.console.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n # rolling file appender appender.RFA.type = RollingRandomAccessFile @@ -43,7 +43,7 @@ appender.RFA.name = RFA appender.RFA.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file} appender.RFA.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%i appender.RFA.layout.type = PatternLayout -appender.RFA.layout.pattern = %d{ISO8601} %5p [%t%x] %c{2}: %m%n +appender.RFA.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n appender.RFA.policies.type = Policies appender.RFA.policies.size.type = SizeBasedTriggeringPolicy appender.RFA.policies.size.size = ${sys:llap.daemon.log.maxfilesize} @@ -63,6 +63,62 @@ appender.HISTORYAPPENDER.policies.size.size = ${sys:llap.daemon.log.maxfilesize} appender.HISTORYAPPENDER.strategy.type = DefaultRolloverStrategy appender.HISTORYAPPENDER.strategy.max = ${sys:llap.daemon.log.maxbackupindex} +# dagId based routing file appender +appender.dag-routing.type = Routing +appender.dag-routing.name = dag-routing +appender.dag-routing.routes.type = Routes +appender.dag-routing.routes.pattern = $${ctx:dagId} +# default route +appender.dag-routing.routes.route-default.type = Route +appender.dag-routing.routes.route-default.key = $${ctx:dagId} +appender.dag-routing.routes.route-default.file-default.type = RollingRandomAccessFile +appender.dag-routing.routes.route-default.file-default.name = file-default +appender.dag-routing.routes.route-default.file-default.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file} +appender.dag-routing.routes.route-default.file-default.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%d{MM-dd-yy-HH-mm} +appender.dag-routing.routes.route-default.file-default.layout.type = PatternLayout +appender.dag-routing.routes.route-default.file-default.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n +appender.dag-routing.routes.route-default.file-default.policies.type = Policies +appender.dag-routing.routes.route-default.file-default.policies.time.type = TimeBasedTriggeringPolicy +appender.dag-routing.routes.route-default.file-default.policies.time.interval = 1 +appender.dag-routing.routes.route-default.file-default.policies.time.modulate = true +appender.dag-routing.routes.route-default.file-default.strategy.type = DefaultRolloverStrategy +appender.dag-routing.routes.route-default.file-default.strategy.max = ${sys:llap.daemon.log.maxbackupindex} +# dagId based route +appender.dag-routing.routes.route-mdc.type = Route +appender.dag-routing.routes.route-mdc.file-mdc.type = RandomAccessFile +appender.dag-routing.routes.route-mdc.file-mdc.name = file-mdc +appender.dag-routing.routes.route-mdc.file-mdc.fileName = ${sys:llap.daemon.log.dir}/${ctx:dagId}.log +appender.dag-routing.routes.route-mdc.file-mdc.layout.type = PatternLayout +appender.dag-routing.routes.route-mdc.file-mdc.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n + +# queryId based routing file appender +appender.query-routing.type = Routing +appender.query-routing.name = query-routing +appender.query-routing.routes.type = Routes +appender.query-routing.routes.pattern = $${ctx:queryId} +# default route +appender.query-routing.routes.route-default.type = Route +appender.query-routing.routes.route-default.key = $${ctx:queryId} +appender.query-routing.routes.route-default.file-default.type = RollingRandomAccessFile +appender.query-routing.routes.route-default.file-default.name = file-default +appender.query-routing.routes.route-default.file-default.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file} +appender.query-routing.routes.route-default.file-default.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%d{MM-dd-yy-HH-mm} +appender.query-routing.routes.route-default.file-default.layout.type = PatternLayout +appender.query-routing.routes.route-default.file-default.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n +appender.query-routing.routes.route-default.file-default.policies.type = Policies +appender.query-routing.routes.route-default.file-default.policies.time.type = TimeBasedTriggeringPolicy +appender.query-routing.routes.route-default.file-default.policies.time.interval = 1 +appender.query-routing.routes.route-default.file-default.policies.time.modulate = true +appender.query-routing.routes.route-default.file-default.strategy.type = DefaultRolloverStrategy +appender.query-routing.routes.route-default.file-default.strategy.max = ${sys:llap.daemon.log.maxbackupindex} +# queryId based route +appender.query-routing.routes.route-mdc.type = Route +appender.query-routing.routes.route-mdc.file-mdc.type = RandomAccessFile +appender.query-routing.routes.route-mdc.file-mdc.name = file-mdc +appender.query-routing.routes.route-mdc.file-mdc.fileName = ${sys:llap.daemon.log.dir}/${ctx:queryId}.log +appender.query-routing.routes.route-mdc.file-mdc.layout.type = PatternLayout +appender.query-routing.routes.route-mdc.file-mdc.layout.pattern = %d{ISO8601} %5p [%t (%X{fragmentId})] %c{2}: %m%n + # list of all loggers loggers = NIOServerCnxn, ClientCnxnSocketNIO, DataNucleus, Datastore, JPOX, HistoryLogger, LlapIoImpl, LlapIoOrc, LlapIoCache, LlapIoLocking diff --git a/llap-server/src/main/resources/package.py b/llap-server/src/main/resources/package.py index 83fe918..a200414 100644 --- a/llap-server/src/main/resources/package.py +++ b/llap-server/src/main/resources/package.py @@ -71,6 +71,7 @@ def main(args): parser.add_argument("--args", default="") parser.add_argument("--name", default="llap0") parser.add_argument("--loglevel", default="INFO") + parser.add_argument("--logger", default="query-routing") parser.add_argument("--chaosmonkey", type=int, default=0) parser.add_argument("--slider-am-container-mb", type=int, default=1024) parser.add_argument("--slider-keytab-dir", default="") @@ -120,6 +121,7 @@ def main(args): "name" : resource.clusterName, "daemon_args" : daemon_args, "daemon_loglevel" : args.loglevel, + "daemon_logger" : args.logger, "queue.string" : resource.queueString, "monkey_interval" : args.chaosmonkey, "monkey_percentage" : monkey_percentage, diff --git a/llap-server/src/main/resources/templates.py b/llap-server/src/main/resources/templates.py index 8baa927..505219a 100644 --- a/llap-server/src/main/resources/templates.py +++ b/llap-server/src/main/resources/templates.py @@ -74,7 +74,7 @@ "site.global.app_user": "yarn", "site.global.app_root": "${AGENT_WORK_ROOT}/app/install/", "site.global.app_tmp_dir": "${AGENT_WORK_ROOT}/tmp/", - "site.global.app_logger": "RFA", + "site.global.app_logger": "%(daemon_logger)s", "site.global.app_log_level": "%(daemon_loglevel)s", "site.global.additional_cp": "%(hadoop_home)s", "site.global.daemon_args": "%(daemon_args)s", diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java index eda8862..e4af660 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTezUtils.java @@ -14,24 +14,14 @@ package org.apache.hadoop.hive.llap.tezplugins; -import java.text.NumberFormat; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.mapreduce.hadoop.MRInputHelpers; -import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.mapreduce.input.MultiMRInput; -import com.google.common.base.Joiner; - @InterfaceAudience.Private public class LlapTezUtils { public static boolean isSourceOfInterest(String inputClassName) { @@ -40,6 +30,10 @@ public static boolean isSourceOfInterest(String inputClassName) { MultiMRInput.class.getName()) || inputClassName.equals(MRInput.class.getName())); } + public static String getDagId(final JobConf job) { + return job.get(MRInput.TEZ_MAPREDUCE_DAG_ID); + } + public static String getFragmentId(final JobConf job) { String taskAttemptId = job.get(MRInput.TEZ_MAPREDUCE_TASK_ATTEMPT_ID); if (taskAttemptId != null) {