diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 6a13b55e69..9c73747778 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.UgiFactory; @@ -271,23 +272,19 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId); - String[] localDirs = fragmentInfo.getLocalDirs(); - Preconditions.checkNotNull(localDirs); - if (LOG.isDebugEnabled()) { - LOG.debug("Dirs are: " + Arrays.toString(localDirs)); - } // May need to setup localDir for re-localization, which is usually setup as Environment.PWD. // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream) - Configuration callableConf = new Configuration(getConfig()); + // Lazy create conf object, as it gets expensive in this codepath. + Supplier callableConf = () -> new Configuration(getConfig()); UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi(); boolean isGuaranteed = request.hasIsGuaranteed() && request.getIsGuaranteed(); // enable the printing of (per daemon) LLAP task queue/run times via LLAP_TASK_TIME_SUMMARY ConfVars tezSummary = ConfVars.TEZ_EXEC_SUMMARY; ConfVars llapTasks = ConfVars.LLAP_TASK_TIME_SUMMARY; - boolean addTaskTimes = callableConf.getBoolean(tezSummary.varname, tezSummary.defaultBoolVal) - && callableConf.getBoolean(llapTasks.varname, llapTasks.defaultBoolVal); + boolean addTaskTimes = getConfig().getBoolean(tezSummary.varname, tezSummary.defaultBoolVal) + && getConfig().getBoolean(llapTasks.varname, llapTasks.defaultBoolVal); final String llapHost; if (UserGroupInformation.isSecurityEnabled()) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index eae8e08540..bf4eea0c73 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -211,9 +211,9 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier); } if (!vertex.getIsExternalSubmission()) { + String[] localDirs = (ShuffleHandler.get().isDirWatcherEnabled()) ? queryInfo.getLocalDirs() : null; ShuffleHandler.get() - .registerDag(appIdString, dagIdentifier, appToken, - user, queryInfo.getLocalDirs()); + .registerDag(appIdString, dagIdentifier, appToken, user, localDirs); } return queryInfo.registerFragment( diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 36192520e3..bc26dc0475 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -86,6 +86,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; /** * @@ -93,7 +94,7 @@ public class TaskRunnerCallable extends CallableWithNdc { private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerCallable.class); private final SubmitWorkRequestProto request; - private final Configuration conf; + private final Supplier conf; private final Map envMap; private final String pid = null; private final ObjectRegistryImpl objectRegistry; @@ -135,8 +136,9 @@ @VisibleForTesting public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo, - Configuration conf, ExecutionContext executionContext, Map envMap, - Credentials credentials, long memoryAvailable, AMReporter amReporter, ConfParams confParams, + Supplier conf, ExecutionContext executionContext, + Map envMap, Credentials credentials, long memoryAvailable, + AMReporter amReporter, ConfParams confParams, LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler, FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim, TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent, @@ -192,6 +194,7 @@ protected TaskRunner2Result callInternal() throws Exception { setMDCFromNDC(); try { + final Configuration config = conf.get(); isStarted.set(true); this.startTime = System.currentTimeMillis(); threadName = Thread.currentThread().getName(); @@ -254,7 +257,7 @@ protected TaskRunner2Result callInternal() throws Exception { @Override public LlapTaskUmbilicalProtocol run() throws Exception { return RPC.getProxy(LlapTaskUmbilicalProtocol.class, - LlapTaskUmbilicalProtocol.versionID, address, taskOwner, conf, socketFactory); + LlapTaskUmbilicalProtocol.versionID, address, taskOwner, config, socketFactory); } }); @@ -277,7 +280,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception { try { synchronized (this) { if (shouldRunTask) { - taskRunner = new TezTaskRunner2(conf, fsTaskUgi, fragmentInfo.getLocalDirs(), + taskRunner = new TezTaskRunner2(config, fsTaskUgi, fragmentInfo.getLocalDirs(), taskSpec, vertex.getQueryIdentifier().getAppAttemptNumber(), serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, executionContext, memoryAvailable, false, tezHadoopShim); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java index aff2c2ec39..9294fb3a63 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java @@ -431,6 +431,10 @@ public int getPort() { return port; } + public boolean isDirWatcherEnabled() { + return dirWatcher != null; + } + /** * Register an application and it's associated credentials and user information. * diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 50dec4759e..af3f292ca2 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -22,6 +22,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; @@ -212,7 +213,7 @@ public static SubmitWorkRequestProto createSubmitWorkRequestProto( public MockRequest(SubmitWorkRequestProto requestProto, QueryFragmentInfo fragmentInfo, boolean canFinish, boolean canFinishQueue, long workTime, TezEvent initialEvent, boolean isGuaranteed) { - super(requestProto, fragmentInfo, new Configuration(), new ExecutionContextImpl("localhost"), + super(requestProto, fragmentInfo, Configuration::new, new ExecutionContextImpl("localhost"), null, new Credentials(), 0, mock(AMReporter.class), null, mock( LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), mock( FragmentCompletionHandler.class), new DefaultHadoopShim(), null, diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java index 8ae00b9c87..93ca9f205c 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestContainerRunnerImpl.java @@ -174,7 +174,9 @@ public void testRegisterDag() throws Exception { containerRunner.submitWork(sRequest); Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().size(), 1); Assert.assertEquals(ShuffleHandler.get().getRegisteredApps().get(appId), dagId); - Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1); - Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId); + if (ShuffleHandler.get().isDirWatcherEnabled()) { + Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().size(), 1); + Assert.assertEquals(ShuffleHandler.get().getRegisteredDirectories().get(appId), dagId); + } } }