diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 6a13b55e69..58951f56cf 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -26,6 +26,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.UgiFactory; import org.apache.hadoop.hive.conf.HiveConf; @@ -271,23 +273,23 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId); - String[] localDirs = fragmentInfo.getLocalDirs(); - Preconditions.checkNotNull(localDirs); - if (LOG.isDebugEnabled()) { - LOG.debug("Dirs are: " + Arrays.toString(localDirs)); - } // May need to setup localDir for re-localization, which is usually setup as Environment.PWD. // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream) - Configuration callableConf = new Configuration(getConfig()); + Supplier callableConf = Suppliers.memoize(new Supplier() { + @Override + public Configuration get() { + return new Configuration(getConfig()); + } + }); UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi(); boolean isGuaranteed = request.hasIsGuaranteed() && request.getIsGuaranteed(); // enable the printing of (per daemon) LLAP task queue/run times via LLAP_TASK_TIME_SUMMARY ConfVars tezSummary = ConfVars.TEZ_EXEC_SUMMARY; ConfVars llapTasks = ConfVars.LLAP_TASK_TIME_SUMMARY; - boolean addTaskTimes = callableConf.getBoolean(tezSummary.varname, tezSummary.defaultBoolVal) - && callableConf.getBoolean(llapTasks.varname, llapTasks.defaultBoolVal); + boolean addTaskTimes = getConfig().getBoolean(tezSummary.varname, tezSummary.defaultBoolVal) + && getConfig().getBoolean(llapTasks.varname, llapTasks.defaultBoolVal); final String llapHost; if (UserGroupInformation.isSecurityEnabled()) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index eae8e08540..bf4eea0c73 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -211,9 +211,9 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier); } if (!vertex.getIsExternalSubmission()) { + String[] localDirs = (ShuffleHandler.get().isDirWatcherEnabled()) ? queryInfo.getLocalDirs() : null; ShuffleHandler.get() - .registerDag(appIdString, dagIdentifier, appToken, - user, queryInfo.getLocalDirs()); + .registerDag(appIdString, dagIdentifier, appToken, user, localDirs); } return queryInfo.registerFragment( diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 36192520e3..91db781758 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.FutureCallback; @@ -93,7 +94,7 @@ public class TaskRunnerCallable extends CallableWithNdc { private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerCallable.class); private final SubmitWorkRequestProto request; - private final Configuration conf; + private final Supplier conf; private final Map envMap; private final String pid = null; private final ObjectRegistryImpl objectRegistry; @@ -135,8 +136,9 @@ @VisibleForTesting public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo, - Configuration conf, ExecutionContext executionContext, Map envMap, - Credentials credentials, long memoryAvailable, AMReporter amReporter, ConfParams confParams, + Supplier conf, ExecutionContext executionContext, + Map envMap, Credentials credentials, long memoryAvailable, + AMReporter amReporter, ConfParams confParams, LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler, FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim, TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent, @@ -254,7 +256,7 @@ protected TaskRunner2Result callInternal() throws Exception { @Override public LlapTaskUmbilicalProtocol run() throws Exception { return RPC.getProxy(LlapTaskUmbilicalProtocol.class, - LlapTaskUmbilicalProtocol.versionID, address, taskOwner, conf, socketFactory); + LlapTaskUmbilicalProtocol.versionID, address, taskOwner, conf.get(), socketFactory); } }); @@ -277,7 +279,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception { try { synchronized (this) { if (shouldRunTask) { - taskRunner = new TezTaskRunner2(conf, fsTaskUgi, fragmentInfo.getLocalDirs(), + taskRunner = new TezTaskRunner2(conf.get(), fsTaskUgi, fragmentInfo.getLocalDirs(), taskSpec, vertex.getQueryIdentifier().getAppAttemptNumber(), serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, executionContext, memoryAvailable, false, tezHadoopShim); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java index aff2c2ec39..9294fb3a63 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java @@ -431,6 +431,10 @@ public int getPort() { return port; } + public boolean isDirWatcherEnabled() { + return dirWatcher != null; + } + /** * Register an application and it's associated credentials and user information. * diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 50dec4759e..47efec144d 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -22,6 +22,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; @@ -212,7 +213,13 @@ public static SubmitWorkRequestProto createSubmitWorkRequestProto( public MockRequest(SubmitWorkRequestProto requestProto, QueryFragmentInfo fragmentInfo, boolean canFinish, boolean canFinishQueue, long workTime, TezEvent initialEvent, boolean isGuaranteed) { - super(requestProto, fragmentInfo, new Configuration(), new ExecutionContextImpl("localhost"), + super(requestProto, fragmentInfo, + new Supplier() { + @Override public Configuration get() { + return new Configuration(); + } + }, new ExecutionContextImpl( + "localhost"), null, new Credentials(), 0, mock(AMReporter.class), null, mock( LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), mock( FragmentCompletionHandler.class), new DefaultHadoopShim(), null,