diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index b0bf844..a2a55cc 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -60,6 +60,8 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.hadoop.shim.HadoopShim; +import org.apache.tez.hadoop.shim.HadoopShimsLoader; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,6 +87,7 @@ private final Configuration conf; private final TaskRunnerCallable.ConfParams confParams; private final KilledTaskHandler killedTaskHandler = new KilledTaskHandlerImpl(); + private final HadoopShim tezHadoopShim; public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize, boolean enablePreemption, String[] localDirsBase, AtomicReference localShufflePort, @@ -122,6 +125,7 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi conf.getInt(TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT, TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT) ); + tezHadoopShim = new HadoopShimsLoader(conf).getHadoopShim(); LOG.info("ContainerRunnerImpl config: " + "memoryPerExecutorDerviced=" + memoryPerExecutor @@ -207,7 +211,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()), new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, - this); + this, tezHadoopShim); submissionState = executorService.schedule(callable); if (LOG.isInfoEnabled()) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index b60f71f..ede2a03 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -51,6 +51,7 @@ import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; @@ -100,6 +101,7 @@ private final LlapDaemonExecutorMetrics metrics; private final String requestId; private final String queryId; + private final HadoopShim tezHadoopShim; private boolean shouldRunTask = true; final Stopwatch runtimeWatch = new Stopwatch(); final Stopwatch killtimerWatch = new Stopwatch(); @@ -115,7 +117,8 @@ public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo frag long memoryAvailable, AMReporter amReporter, ConfParams confParams, LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler, - FragmentCompletionHandler fragmentCompleteHandler) { + FragmentCompletionHandler fragmentCompleteHandler, + HadoopShim tezHadoopShim) { this.request = request; this.fragmentInfo = fragmentInfo; this.conf = conf; @@ -139,6 +142,7 @@ public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo frag this.queryId = request.getFragmentSpec().getDagName(); this.killedTaskHandler = killedTaskHandler; this.fragmentCompletionHanler = fragmentCompleteHandler; + this.tezHadoopShim = tezHadoopShim; } public long getStartTime() { @@ -216,7 +220,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception { serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, - executionContext, memoryAvailable, false); + executionContext, memoryAvailable, false, tezHadoopShim); } } if (taskRunner == null) { diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 38af07e..d3ba6dd 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -33,6 +33,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; @@ -125,7 +126,7 @@ public MockRequest(SubmitWorkRequestProto requestProto, new ExecutionContextImpl("localhost"), null, new Credentials(), 0, null, null, mock( LlapDaemonExecutorMetrics.class), mock(KilledTaskHandler.class), mock( - FragmentCompletionHandler.class)); + FragmentCompletionHandler.class), new DefaultHadoopShim()); this.workTime = workTime; this.canFinish = canFinish; } diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java index ebfb430..73df985 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java @@ -39,6 +39,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; @@ -58,7 +59,7 @@ public MockRequest(SubmitWorkRequestProto requestProto, super(requestProto, mock(QueryFragmentInfo.class), conf, new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null, mock(KilledTaskHandler.class), mock( - FragmentCompletionHandler.class)); + FragmentCompletionHandler.class), new DefaultHadoopShim()); this.workTime = workTime; this.canFinish = canFinish; }