diff --git a/common/src/java/org/apache/hadoop/hive/common/CallableWithNdc.java b/common/src/java/org/apache/hadoop/hive/common/CallableWithNdc.java deleted file mode 100644 index 2b78884..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/CallableWithNdc.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.common; - -import java.util.Stack; -import java.util.concurrent.Callable; - -import org.apache.log4j.NDC; - -// TODO: cloned from TEZ-2003; replace when that's in a release. -public abstract class CallableWithNdc implements Callable { - private final Stack ndcStack; - - public CallableWithNdc() { - ndcStack = NDC.cloneStack(); - } - - @Override - public final T call() throws Exception { - NDC.inherit(ndcStack); - try { - return callInternal(); - } finally { - NDC.clear(); - } - } - - protected abstract T callInternal() throws Exception; -} diff --git a/common/src/java/org/apache/hadoop/hive/common/RunnableWithNdc.java b/common/src/java/org/apache/hadoop/hive/common/RunnableWithNdc.java deleted file mode 100644 index 35a45d1..0000000 --- a/common/src/java/org/apache/hadoop/hive/common/RunnableWithNdc.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.common; - -import java.util.Stack; - -import org.apache.log4j.NDC; - -//TODO: cloned from TEZ-2003; replace when that's in a release. -public abstract class RunnableWithNdc implements Runnable { - private final Stack ndcStack; - - public RunnableWithNdc() { - ndcStack = NDC.cloneStack(); - } - - @Override - public final void run() { - NDC.inherit(ndcStack); - try { - runInternal(); - } finally { - NDC.clear(); - } - } - - protected abstract void runInternal(); -} diff --git a/llap-server/bin/llap-daemon-env.sh b/llap-server/bin/llap-daemon-env.sh index a2298ad..ccc764e 100755 --- a/llap-server/bin/llap-daemon-env.sh +++ b/llap-server/bin/llap-daemon-env.sh @@ -32,7 +32,10 @@ #export LLAP_DAEMON_USER_CLASSPATH= # Logger setup for LLAP daemon -#export LLAP_DAEMON_LOGGER=INFO,RFA +#export LLAP_DAEMON_LOG_LEVEL=INFO + +# Root logger for LLAP daemon +#export LLAP_DAEMON_LOGGER=console # Directory to which logs will be generated #export LLAP_DAEMON_LOG_DIR= diff --git a/llap-server/bin/llapDaemon.sh b/llap-server/bin/llapDaemon.sh index 010e1e6..6f57998 100755 --- a/llap-server/bin/llapDaemon.sh +++ b/llap-server/bin/llapDaemon.sh @@ -30,7 +30,7 @@ #set -x -usage="Usage: llap-daemon.sh (start|stop) " +usage="Usage: llapDaemon.sh (start|stop) " # if no args specified, show usage if [ $# -le 0 ]; then @@ -86,10 +86,6 @@ fi # some variables LLAP_DAEMON_LOG_BASE=llap-daemon-$USER-$HOSTNAME export LLAP_DAEMON_LOG_FILE=$LLAP_DAEMON_LOG_BASE.log -if [ ! -n "${LLAP_DAEMON_LOGGER}" ]; then - echo "LLAP_DAEMON_LOGGER not defined... using defaults" - LLAP_DAEMON_LOGGER=${LOG_LEVEL_DEFAULT} -fi logLog=$LLAP_DAEMON_LOG_DIR/$LLAP_DAEMON_LOG_BASE.log logOut=$LLAP_DAEMON_LOG_DIR/$LLAP_DAEMON_LOG_BASE.out pid=$LLAP_DAEMON_PID_DIR/llap-daemon.pid diff --git a/llap-server/bin/runLlapDaemon.sh b/llap-server/bin/runLlapDaemon.sh index b39c4fd..b596250 100755 --- a/llap-server/bin/runLlapDaemon.sh +++ b/llap-server/bin/runLlapDaemon.sh @@ -48,7 +48,8 @@ shift JAVA=$JAVA_HOME/bin/java -LOG_LEVEL_DEFAULT="INFO,console" +LOG_LEVEL_DEFAULT="INFO" +ROOT_LOGGER_DEFAULT="console" JAVA_OPTS_BASE="-server -Djava.net.preferIPv4Stack=true -XX:NewRatio=8 -XX:+UseNUMA -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps" # CLASSPATH initially contains $HADOOP_CONF_DIR & $YARN_CONF_DIR @@ -71,8 +72,13 @@ if [ ! -d "${LLAP_DAEMON_CONF_DIR}" ]; then fi if [ ! -n "${LLAP_DAEMON_LOGGER}" ]; then - echo "LLAP_DAEMON_LOGGER not defined... using defaults" - LLAP_DAEMON_LOGGER=${LOG_LEVEL_DEFAULT} + echo "LLAP_DAEMON_LOGGER not defined... using default: ${ROOT_LOGGER_DEFAULT}" + LLAP_DAEMON_LOGGER=${ROOT_LOGGER_DEFAULT} +fi + +if [ ! -n "${LLAP_DAEMON_LOG_LEVEL}" ]; then + echo "LLAP_DAEMON_LOG_LEVEL not defined... using default: ${LOG_LEVEL_DEFAULT}" + LLAP_DAEMON_LOG_LEVEL=${LOG_LEVEL_DEFAULT} fi CLASSPATH=${LLAP_DAEMON_CONF_DIR}:${LLAP_DAEMON_HOME}/lib/*:`${HADOOP_PREFIX}/bin/hadoop classpath`:. @@ -120,10 +126,12 @@ if [ -n "$LLAP_DAEMON_TMP_DIR" ]; then export LLAP_DAEMON_OPTS="${LLAP_DAEMON_OPTS} -Djava.io.tmpdir=$LLAP_DAEMON_TMP_DIR" fi -LLAP_DAEMON_OPTS="${LLAP_DAEMON_OPTS} -Dlog4j.configuration=llap-daemon-log4j.properties" +LLAP_DAEMON_OPTS="${LLAP_DAEMON_OPTS} -Dlog4j.configurationFile=llap-daemon-log4j2.xml" LLAP_DAEMON_OPTS="${LLAP_DAEMON_OPTS} -Dllap.daemon.log.dir=${LLAP_DAEMON_LOG_DIR}" LLAP_DAEMON_OPTS="${LLAP_DAEMON_OPTS} -Dllap.daemon.log.file=${LLAP_DAEMON_LOG_FILE}" LLAP_DAEMON_OPTS="${LLAP_DAEMON_OPTS} -Dllap.daemon.root.logger=${LLAP_DAEMON_LOGGER}" +LLAP_DAEMON_OPTS="${LLAP_DAEMON_OPTS} -Dllap.daemon.log.level=${LLAP_DAEMON_LOG_LEVEL}" +LLAP_DAEMON_OPTS="${LLAP_DAEMON_OPTS} -DisThreadContextMapInheritable=true" exec "$JAVA" -Dproc_llapdaemon -Xms${LLAP_DAEMON_HEAPSIZE}m -Xmx${LLAP_DAEMON_HEAPSIZE}m ${LLAP_DAEMON_OPTS} -classpath "$CLASSPATH" $CLASS "$@" diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 2fd2546..6a80370 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -14,12 +14,12 @@ package org.apache.hadoop.hive.llap.daemon.impl; -import javax.net.SocketFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; @@ -30,14 +30,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import javax.net.SocketFactory; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; @@ -56,6 +51,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Responsible for communicating with various AMs. */ @@ -238,10 +240,10 @@ public void onFailure(Throwable t) { }); } - private class QueueLookupCallable extends CallableWithNdc { + private class QueueLookupCallable implements Callable { @Override - protected Void callInternal() { + public Void call() { while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { try { final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take(); @@ -291,7 +293,7 @@ public void onFailure(Throwable t) { } } - private class KillTaskCallable extends CallableWithNdc { + private class KillTaskCallable implements Callable { final AMNodeInfo amNodeInfo; final TezTaskAttemptID taskAttemptId; @@ -302,7 +304,7 @@ public KillTaskCallable(TezTaskAttemptID taskAttemptId, } @Override - protected Void callInternal() { + public Void call() { try { amNodeInfo.getUmbilical().taskKilled(taskAttemptId); } catch (IOException e) { @@ -316,7 +318,7 @@ protected Void callInternal() { } } - private class AMHeartbeatCallable extends CallableWithNdc { + private class AMHeartbeatCallable implements Callable { final AMNodeInfo amNodeInfo; @@ -325,7 +327,7 @@ public AMHeartbeatCallable(AMNodeInfo amNodeInfo) { } @Override - protected Void callInternal() { + public Void call() { if (LOG.isTraceEnabled()) { LOG.trace("Attempting to heartbeat to AM: " + amNodeInfo); } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index fad2d2c..ca1f6f0 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -50,7 +50,6 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; -import org.apache.log4j.NDC; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConfiguration; @@ -154,64 +153,58 @@ public void submitWork(SubmitWorkRequestProto request) throws IOException { LOG.info("Queueing container for execution: " + stringifySubmitRequest(request)); } // This is the start of container-annotated logging. - // TODO Reduce the length of this string. Way too verbose at the moment. - String ndcContextString = request.getFragmentSpec().getFragmentIdentifierString(); - NDC.push(ndcContextString); + Map env = new HashMap<>(); + // TODO What else is required in this environment map. + env.putAll(localEnv); + env.put(ApplicationConstants.Environment.USER.name(), request.getUser()); + + FragmentSpecProto fragmentSpec = request.getFragmentSpec(); + TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString( + fragmentSpec.getFragmentIdentifierString()); + int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); + + QueryFragmentInfo fragmentInfo = queryTracker + .registerFragment(null, request.getApplicationIdString(), fragmentSpec.getDagName(), + dagIdentifier, + fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(), + fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec()); + + String[] localDirs = fragmentInfo.getLocalDirs(); + Preconditions.checkNotNull(localDirs); + + if (LOG.isDebugEnabled()) { + LOG.debug("Dirs are: " + Arrays.toString(localDirs)); + } + // May need to setup localDir for re-localization, which is usually setup as Environment.PWD. + // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream) + + Credentials credentials = new Credentials(); + DataInputBuffer dib = new DataInputBuffer(); + byte[] tokenBytes = request.getCredentialsBinary().toByteArray(); + dib.reset(tokenBytes, tokenBytes.length); + credentials.readTokenStorageStream(dib); + + Token jobToken = TokenCache.getSessionToken(credentials); + + LOG.info("DEBUG: Registering request with the ShuffleHandler"); + ShuffleHandler.get() + .registerDag(request.getApplicationIdString(), dagIdentifier, jobToken, + request.getUser(), localDirs); + + TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, + new Configuration(getConfig()), + new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env, + credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, + this); try { - Map env = new HashMap<>(); - // TODO What else is required in this environment map. - env.putAll(localEnv); - env.put(ApplicationConstants.Environment.USER.name(), request.getUser()); - - FragmentSpecProto fragmentSpec = request.getFragmentSpec(); - TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString( - fragmentSpec.getFragmentIdentifierString()); - int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); - - QueryFragmentInfo fragmentInfo = queryTracker - .registerFragment(null, request.getApplicationIdString(), fragmentSpec.getDagName(), - dagIdentifier, - fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(), - fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec()); - - String[] localDirs = fragmentInfo.getLocalDirs(); - Preconditions.checkNotNull(localDirs); - - if (LOG.isDebugEnabled()) { - LOG.debug("Dirs are: " + Arrays.toString(localDirs)); - } - // May need to setup localDir for re-localization, which is usually setup as Environment.PWD. - // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream) - - Credentials credentials = new Credentials(); - DataInputBuffer dib = new DataInputBuffer(); - byte[] tokenBytes = request.getCredentialsBinary().toByteArray(); - dib.reset(tokenBytes, tokenBytes.length); - credentials.readTokenStorageStream(dib); - - Token jobToken = TokenCache.getSessionToken(credentials); - - LOG.info("DEBUG: Registering request with the ShuffleHandler"); - ShuffleHandler.get() - .registerDag(request.getApplicationIdString(), dagIdentifier, jobToken, - request.getUser(), localDirs); - - TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()), - new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env, - credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, - this); - try { - executorService.schedule(callable); - } catch (RejectedExecutionException e) { - // Stop tracking the fragment and re-throw the error. - fragmentComplete(fragmentInfo); - throw e; - } - metrics.incrExecutorTotalRequestsHandled(); - metrics.incrExecutorNumQueuedRequests(); - } finally { - NDC.pop(); + executorService.schedule(callable); + } catch (RejectedExecutionException e) { + // Stop tracking the fragment and re-throw the error. + fragmentComplete(fragmentInfo); + throw e; } + metrics.incrExecutorTotalRequestsHandled(); + metrics.incrExecutorNumQueuedRequests(); } private static class LlapExecutionContext extends ExecutionContextImpl diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index e0bd48a..bee0325 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -22,14 +22,13 @@ import java.security.PrivilegedExceptionAction; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; @@ -47,6 +46,7 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.log4j.MDC; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; @@ -62,6 +62,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; @@ -73,7 +74,7 @@ /** * */ -public class TaskRunnerCallable extends CallableWithNdc { +public class TaskRunnerCallable implements Callable { private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerCallable.class); private final SubmitWorkRequestProto request; private final Configuration conf; @@ -145,7 +146,14 @@ public long getStartTime() { } @Override - protected TaskRunner2Result callInternal() throws Exception { + public TaskRunner2Result call() throws Exception { + + // TODO: Fragment ID should be made smaller + final String applicationId = request.getApplicationIdString(); + final String fragmentId = request.getFragmentSpec().getFragmentIdentifierString(); + MDC.put("appId", applicationId); + MDC.put("fragId", fragmentId); + isStarted.set(true); this.startTime = System.currentTimeMillis(); @@ -168,8 +176,7 @@ protected TaskRunner2Result callInternal() throws Exception { ExecutorService executorReal = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat( - "TezTaskRunner_" + request.getFragmentSpec().getFragmentIdentifierString()) + .setNameFormat("TezTaskRunner_" + fragmentId) .build()); executor = MoreExecutors.listeningDecorator(executorReal); @@ -242,6 +249,8 @@ public LlapTaskUmbilicalProtocol run() throws Exception { } } finally { IOContextMap.clearThreadAttempt(attemptId); + MDC.remove("appId"); + MDC.remove("fragId"); } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 4f7bb78..bd43213 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -6,20 +6,20 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.common.Pool; import org.apache.hadoop.hive.common.Pool.PoolObjectHelper; -import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.Allocator; -import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; +import org.apache.hadoop.hive.common.io.DataCache; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.ConsumerFeedback; @@ -40,24 +40,24 @@ import org.apache.hadoop.hive.ql.io.orc.CompressionKind; import org.apache.hadoop.hive.ql.io.orc.DataReader; import org.apache.hadoop.hive.ql.io.orc.MetadataReader; +import org.apache.hadoop.hive.ql.io.orc.OrcConf; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions; -import org.apache.hadoop.hive.ql.io.orc.OrcConf; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcProto; import org.apache.hadoop.hive.ql.io.orc.OrcSplit; -import org.apache.hadoop.hive.ql.io.orc.encoded.Reader; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils; +import org.apache.hadoop.hive.ql.io.orc.StripeInformation; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile; import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader; import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; import org.apache.hadoop.hive.ql.io.orc.encoded.OrcCacheKey; +import org.apache.hadoop.hive.ql.io.orc.encoded.Reader; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory; -import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils; -import org.apache.hadoop.hive.ql.io.orc.StripeInformation; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; @@ -69,8 +69,8 @@ * it inserts itself into the pipeline to put the data in cache, before passing it to the real * consumer. It also serves as ConsumerFeedback that receives processed EncodedColumnBatch-es. */ -public class OrcEncodedDataReader extends CallableWithNdc - implements ConsumerFeedback, Consumer { +public class OrcEncodedDataReader implements Callable, + ConsumerFeedback, Consumer { private static final Log LOG = LogFactory.getLog(OrcEncodedDataReader.class); public static final FixedSizedObjectPool CSD_POOL = new FixedSizedObjectPool<>(8192, new PoolObjectHelper() { @@ -179,7 +179,7 @@ public void unpause() { } @Override - protected Void callInternal() throws IOException { + public Void call() throws IOException { long startTime = counters.startTimeCounter(); if (LlapIoImpl.LOGL.isInfoEnabled()) { LlapIoImpl.LOG.info("Processing data for " + split.getPath()); diff --git a/llap-server/src/main/resources/llap-daemon-log4j.properties b/llap-server/src/main/resources/llap-daemon-log4j.properties deleted file mode 100644 index 51593e9..0000000 --- a/llap-server/src/main/resources/llap-daemon-log4j.properties +++ /dev/null @@ -1,78 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Define some default values that can be overridden by system properties -llap.daemon.root.logger=INFO,console -llap.daemon.log.dir=. -llap.daemon.log.file=llapdaemon.log - -# Define the root logger to the system property "llap.daemon.root.logger". -log4j.rootLogger=${llap.daemon.root.logger} - -# Logging Threshold -log4j.threshold=ALL - -# Null Appender -log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender - -# -# Rolling File Appender - cap space usage at 5gb. -# -llap.daemon.log.maxfilesize=256MB -llap.daemon.log.maxbackupindex=20 -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.File=${llap.daemon.log.dir}/${llap.daemon.log.file} - -log4j.appender.RFA.MaxFileSize=${llap.daemon.log.maxfilesize} -log4j.appender.RFA.MaxBackupIndex=${llap.daemon.log.maxbackupindex} - -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout - -# Pattern format: Date LogLevel LoggerName LogMessage -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t(%x)] %p %c: %m%n -# Debugging Pattern format -#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n - - -# -# Daily Rolling File Appender -# - -log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender -log4j.appender.DRFA.File=${llap.daemon.log.dir}/${llap.daemon.log.file} - -# Rollver at midnight -log4j.appender.DRFA.DatePattern=.yyyy-MM-dd - -# 30-day backup -#log4j.appender.DRFA.MaxBackupIndex=30 -log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout - -# Pattern format: Date LogLevel LoggerName LogMessage -log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} [%t(%x)] %p %c: %m%n -# Debugging Pattern format -#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n - - -# -# console -# Add "console" to rootlogger above if you want to use this -# - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t(%x)] %p %c{2} : %m%n diff --git a/llap-server/src/main/resources/llap-daemon-log4j2.xml b/llap-server/src/main/resources/llap-daemon-log4j2.xml new file mode 100644 index 0000000..3ac61b1 --- /dev/null +++ b/llap-server/src/main/resources/llap-daemon-log4j2.xml @@ -0,0 +1,93 @@ + + + + + + + ALL + INFO + console + . + llapdaemon.log + llapdaemon_history.log + 256MB + 20 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 38af07e..24a6dee 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -131,7 +131,7 @@ public MockRequest(SubmitWorkRequestProto requestProto, } @Override - protected TaskRunner2Result callInternal() { + public TaskRunner2Result call() { try { logInfo(super.getRequestId() + " is executing..", null); lock.lock(); diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java index ebfb430..f71931c 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java @@ -20,70 +20,21 @@ import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.mock; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; -import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.impl.EvictingPriorityBlockingQueue; -import org.apache.hadoop.hive.llap.daemon.impl.QueryFragmentInfo; import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; -import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.runtime.api.impl.ExecutionContextImpl; -import org.apache.tez.runtime.task.EndReason; -import org.apache.tez.runtime.task.TaskRunner2Result; -import org.junit.Before; import org.junit.Test; public class TestFirstInFirstOutComparator { - private static Configuration conf; - private static Credentials cred = new Credentials(); - - private static class MockRequest extends TaskRunnerCallable { - private int workTime; - private boolean canFinish; - - public MockRequest(SubmitWorkRequestProto requestProto, - boolean canFinish, int workTime) { - super(requestProto, mock(QueryFragmentInfo.class), conf, - new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null, - mock(KilledTaskHandler.class), mock( - FragmentCompletionHandler.class)); - this.workTime = workTime; - this.canFinish = canFinish; - } - - @Override - protected TaskRunner2Result callInternal() { - System.out.println(super.getRequestId() + " is executing.."); - try { - Thread.sleep(workTime); - } catch (InterruptedException e) { - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); - } - return new TaskRunner2Result(EndReason.SUCCESS, null, false); - } - - @Override - public boolean canFinish() { - return canFinish; - } - } - - @Before - public void setup() { - conf = new Configuration(); - } private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime, int attemptStartTime) { diff --git a/llap-server/src/test/resources/llap-daemon-log4j.properties b/llap-server/src/test/resources/llap-daemon-log4j.properties deleted file mode 100644 index 209436e..0000000 --- a/llap-server/src/test/resources/llap-daemon-log4j.properties +++ /dev/null @@ -1,94 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Define some default values that can be overridden by system properties -llap.daemon.root.logger=INFO,console -llap.daemon.log.dir=. -llap.daemon.log.file=llapdaemon.log - -llap.daemon.historylog.file=llapdaemon_history.log -log4j.logger.org.apache.hadoop.hive.llap.daemon.HistoryLogger=INFO,HISTORYAPPENDER - -# Define the root logger to the system property "llap.daemon.root.logger". -log4j.rootLogger=${llap.daemon.root.logger} - -# Logging Threshold -log4j.threshold=ALL - - -# Null Appender -log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender - - - -# History Events appender -log4j.appender.HISTORYAPPENDER=org.apache.log4j.RollingFileAppender -log4j.appender.HISTORYAPPENDER.File=${llap.daemon.log.dir}/${llap.daemon.historylog.file} -log4j.appender.HISTORYAPPENDER.MaxFileSize=${llap.daemon.log.maxfilesize} -log4j.appender.HISTORYAPPENDER.MaxBackupIndex=${llap.daemon.log.maxbackupindex} -log4j.appender.HISTORYAPPENDER.layout=org.apache.log4j.EnhancedPatternLayout -log4j.appender.HISTORYAPPENDER.layout.ConversionPattern=%m%n - - - -# Rolling File Appender - cap space usage at 5gb. -# -llap.daemon.log.maxfilesize=256MB -llap.daemon.log.maxbackupindex=20 -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.File=${llap.daemon.log.dir}/${llap.daemon.log.file} -log4j.appender.RFA.Append=true - -log4j.appender.RFA.MaxFileSize=${llap.daemon.log.maxfilesize} -log4j.appender.RFA.MaxBackupIndex=${llap.daemon.log.maxbackupindex} - -log4j.appender.RFA.layout=org.apache.log4j.EnhancedPatternLayout - -# Pattern format: Date LogLevel LoggerName LogMessage -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t(%x)] %p %c: %m%n -# Debugging Pattern format -#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n - - -# -# Daily Rolling File Appender -# - -log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender -log4j.appender.DRFA.File=${llap.daemon.log.dir}/${llap.daemon.log.file} - -# Rollver at midnight -log4j.appender.DRFA.DatePattern=.yyyy-MM-dd - -# 30-day backup -#log4j.appender.DRFA.MaxBackupIndex=30 -log4j.appender.DRFA.layout=org.apache.log4j.EnhancedPatternLayout - -# Pattern format: Date LogLevel LoggerName LogMessage -log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} [%t(%x)] %p %c: %m%n -# Debugging Pattern format -#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n - - -# -# console -# Add "console" to rootlogger above if you want to use this -# - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.EnhancedPatternLayout -log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} [%t(%x)] %p %c{2} : %m%n diff --git a/llap-server/src/test/resources/llap-daemon-log4j2.xml b/llap-server/src/test/resources/llap-daemon-log4j2.xml new file mode 100644 index 0000000..3ac61b1 --- /dev/null +++ b/llap-server/src/test/resources/llap-daemon-log4j2.xml @@ -0,0 +1,93 @@ + + + + + + + ALL + INFO + console + . + llapdaemon.log + llapdaemon_history.log + 256MB + 20 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/packaging/src/main/assembly/bin.xml b/packaging/src/main/assembly/bin.xml index f0c96ed..2c52d4b 100644 --- a/packaging/src/main/assembly/bin.xml +++ b/packaging/src/main/assembly/bin.xml @@ -357,9 +357,9 @@ beeline-log4j2.xml.template - ${project.parent.basedir}/llap-server/src/main/resources/llap-daemon-log4j.properties + ${project.parent.basedir}/llap-server/src/main/resources/llap-daemon-log4j2.xml conf - llap-daemon-log4j.properties.template + llap-daemon-log4j2.xml.template ${project.parent.basedir}/hcatalog/README.txt