diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java index 07f96f5..3f1f58f 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapContainerLauncher.java @@ -14,52 +14,30 @@ package org.apache.hadoop.hive.llap.tezplugins; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.tez.dag.app.AppContext; -import org.apache.tez.dag.app.TaskAttemptListener; -import org.apache.tez.dag.app.launcher.ContainerLauncher; -import org.apache.tez.dag.app.rm.NMCommunicatorEvent; -import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent; -import org.apache.tez.dag.app.rm.container.AMContainerEvent; -import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched; -import org.apache.tez.dag.app.rm.container.AMContainerEventType; -import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerLauncher; +import org.apache.tez.serviceplugins.api.ContainerLauncherContext; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class LlapContainerLauncher extends AbstractService implements ContainerLauncher { - static final Log LOG = LogFactory.getLog(LlapContainerLauncher.class); +public class LlapContainerLauncher extends ContainerLauncher { + private static final Logger LOG = LoggerFactory.getLogger(LlapContainerLauncher.class); - private final AppContext context; - private final Clock clock; + public LlapContainerLauncher(ContainerLauncherContext containerLauncherContext) { + super(LlapContainerLauncher.class.getName(), containerLauncherContext); + } - public LlapContainerLauncher(AppContext appContext, Configuration conf, - TaskAttemptListener tal) { - super(LlapContainerLauncher.class.getName()); - this.context = appContext; - this.clock = appContext.getClock(); + @Override + public void launchContainer(ContainerLaunchRequest containerLaunchRequest) { + LOG.info("No-op launch for container: " + containerLaunchRequest.getContainerId() + + " succeeded on host: " + containerLaunchRequest.getNodeId()); + getContext().containerLaunched(containerLaunchRequest.getContainerId()); } @Override - public void handle(NMCommunicatorEvent event) { - switch(event.getType()) { - case CONTAINER_LAUNCH_REQUEST: - final NMCommunicatorLaunchRequestEvent launchEvent = (NMCommunicatorLaunchRequestEvent) event; - LOG.info("No-op launch for container: " + launchEvent.getContainerId() + " succeeded on host: " + launchEvent.getNodeId()); - context.getEventHandler().handle(new AMContainerEventLaunched(launchEvent.getContainerId())); - ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent( - launchEvent.getContainerId(), clock.getTime(), context.getApplicationAttemptId()); - context.getHistoryHandler().handle(new DAGHistoryEvent( - null, lEvt)); - break; - case CONTAINER_STOP_REQUEST: - LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + event); - context.getEventHandler().handle(new AMContainerEvent(event.getContainerId(), - AMContainerEventType.C_NM_STOP_SENT)); - break; - } + public void stopContainer(ContainerStopRequest containerStopRequest) { + LOG.info("DEBUG: Ignoring STOP_REQUEST for event: " + containerStopRequest); + getContext().containerStopRequested(containerStopRequest.getContainerId()); } -} +} \ No newline at end of file diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 2305b8c..dc06c97 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; -import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; @@ -55,8 +54,6 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.security.JobTokenSecretManager; -import org.apache.tez.dag.api.ContainerEndReason; -import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.api.TaskCommunicatorContext; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; @@ -67,6 +64,8 @@ import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; +import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index 5a2b77d..17b17ee 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Clock; -import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.app.AppContext; import com.google.common.annotations.VisibleForTesting; @@ -69,6 +68,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java index 6b62a37..b1cd15e 100644 --- llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java +++ llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java @@ -41,10 +41,10 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tez.dag.api.TaskAttemptEndReason; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ControlledClock; import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback; +import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.junit.Test; import org.mockito.ArgumentCaptor;