diff --git llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java index 1ab3d15..89b7198 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/ContainerFactory.java @@ -23,20 +23,17 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.tez.dag.app.AppContext; class ContainerFactory { final ApplicationAttemptId customAppAttemptId; AtomicLong nextId; - public ContainerFactory(AppContext appContext, long appIdLong) { + public ContainerFactory(ApplicationAttemptId appAttemptId, long appIdLong) { this.nextId = new AtomicLong(1); ApplicationId appId = - ApplicationId.newInstance(appIdLong, appContext.getApplicationAttemptId() - .getApplicationId().getId()); + ApplicationId.newInstance(appIdLong, appAttemptId.getApplicationId().getId()); this.customAppAttemptId = - ApplicationAttemptId.newInstance(appId, appContext.getApplicationAttemptId() - .getAttemptId()); + ApplicationAttemptId.newInstance(appId, appAttemptId.getAttemptId()); } public Container createContainer(Resource capability, Priority priority, String hostname, diff --git llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index 17b17ee..b6ee3d8 100644 --- llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -60,7 +60,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Clock; -import org.apache.tez.dag.app.AppContext; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -68,16 +67,18 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskScheduler; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class LlapTaskSchedulerService extends TaskSchedulerService { +public class LlapTaskSchedulerService extends TaskScheduler { private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class); - private final ExecutorService appCallbackExecutor; - private final TaskSchedulerAppCallback appClientDelegate; + private final Configuration conf; // interface into the registry service private ServiceInstanceSet activeInstances; @@ -150,16 +151,17 @@ public int compare(Priority o1, Priority o2) { @VisibleForTesting StatsPerDag dagStats = new StatsPerDag(); - public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext appContext, - String clientHostname, int clientPort, String trackingUrl, long customAppIdIdentifier, - Configuration conf) { - // Accepting configuration here to allow setting up fields as final + public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { + this(taskSchedulerContext, new SystemClock()); + } - super(LlapTaskSchedulerService.class.getName()); - this.appCallbackExecutor = createAppCallbackExecutorService(); - this.appClientDelegate = createAppCallbackDelegate(appClient); - this.clock = appContext.getClock(); - this.containerFactory = new ContainerFactory(appContext, customAppIdIdentifier); + @VisibleForTesting + public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock) { + super(taskSchedulerContext); + this.clock = clock; + this.conf = taskSchedulerContext.getInitialConfiguration(); + this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(), + taskSchedulerContext.getCustomClusterIdentifier()); this.memoryPerInstance = conf.getInt(LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB, LlapConfiguration.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB_DEFAULT); @@ -206,12 +208,12 @@ public LlapTaskSchedulerService(TaskSchedulerAppCallback appClient, AppContext a } @Override - public void serviceInit(Configuration conf) { + public void initialize() { registry.init(conf); } @Override - public void serviceStart() throws IOException { + public void start() throws IOException { writeLock.lock(); try { nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable); @@ -249,7 +251,7 @@ public void onFailure(Throwable t) { } @Override - public void serviceStop() { + public void shutdown() { writeLock.lock(); try { if (!this.isStopped.getAndSet(true)) { @@ -268,7 +270,6 @@ public void serviceStop() { if (registry != null) { registry.stop(); } - appCallbackExecutor.shutdownNow(); } } finally { writeLock.unlock(); @@ -479,7 +480,7 @@ public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEnd } finally { writeLock.unlock(); } - appClientDelegate.containerBeingReleased(taskInfo.containerId); + getContext().containerBeingReleased(taskInfo.containerId); return true; } @@ -507,11 +508,6 @@ private ExecutorService createAppCallbackExecutorService() { .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build()); } - @VisibleForTesting - TaskSchedulerAppCallback createAppCallbackDelegate(TaskSchedulerAppCallback realAppClient) { - return new TaskSchedulerAppCallbackWrapper(realAppClient, appCallbackExecutor); - } - /** * @param request the list of preferred hosts. null implies any host * @return @@ -791,7 +787,7 @@ private boolean scheduleTask(TaskInfo taskInfo) { writeLock.unlock(); } - appClientDelegate.taskAllocated(taskInfo.task, taskInfo.clientCookie, container); + getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container); return true; } } @@ -842,7 +838,7 @@ private void preemptTasks(int forPriority, int numTasksToPreempt) { if (preemptedTaskList != null) { for (TaskInfo taskInfo : preemptedTaskList) { LOG.info("DBG: Preempting task {}", taskInfo); - appClientDelegate.preemptContainer(taskInfo.containerId); + getContext().preemptContainer(taskInfo.containerId); } } // The schedule loop will be triggered again when the deallocateTask request comes in for the diff --git llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java index b1cd15e..245c140 100644 --- llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java +++ llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -40,11 +41,11 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ControlledClock; -import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; +import org.apache.tez.serviceplugins.api.TaskSchedulerContext; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -57,7 +58,7 @@ private static final String HOST3 = "host3"; @Test (timeout = 5000) - public void testSimpleLocalAllocation() { + public void testSimpleLocalAllocation() throws IOException { TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(); @@ -79,7 +80,7 @@ public void testSimpleLocalAllocation() { } @Test (timeout = 5000) - public void testSimpleNoLocalityAllocation() { + public void testSimpleNoLocalityAllocation() throws IOException { TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(); try { @@ -101,7 +102,7 @@ public void testSimpleNoLocalityAllocation() { // task triggers the next task to be scheduled. @Test(timeout=5000) - public void testPreemption() throws InterruptedException { + public void testPreemption() throws InterruptedException, IOException { Priority priority1 = Priority.newInstance(1); Priority priority2 = Priority.newInstance(2); @@ -155,7 +156,7 @@ public void testPreemption() throws InterruptedException { } @Test(timeout=5000) - public void testNodeDisabled() { + public void testNodeDisabled() throws IOException { TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(10000l); try { Priority priority1 = Priority.newInstance(1); @@ -200,7 +201,7 @@ public void testNodeDisabled() { } // Flaky test disabled @Test(timeout=5000) - public void testNodeReEnabled() throws InterruptedException { + public void testNodeReEnabled() throws InterruptedException, IOException { // Based on actual timing. TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(1000l); try { @@ -274,22 +275,22 @@ public void testNodeReEnabled() throws InterruptedException { private static class TestTaskSchedulerServiceWrapper { static final Resource resource = Resource.newInstance(1024, 1); Configuration conf; - TaskSchedulerAppCallback mockAppCallback = mock(TaskSchedulerAppCallback.class); - AppContext mockAppContext = mock(AppContext.class); + TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class); ControlledClock clock = new ControlledClock(new SystemClock()); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1); LlapTaskSchedulerServiceForTest ts; - TestTaskSchedulerServiceWrapper() { + TestTaskSchedulerServiceWrapper() throws IOException { this(2000l); } - TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) { + TestTaskSchedulerServiceWrapper(long disableTimeoutMillis) throws IOException { this(disableTimeoutMillis, new String[]{HOST1, HOST2, HOST3}, 4, LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT); } - TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) { + TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize) throws + IOException { conf = new Configuration(); conf.setStrings(LlapConfiguration.LLAP_DAEMON_SERVICE_HOSTS, hosts); conf.setInt(LlapConfiguration.LLAP_DAEMON_NUM_EXECUTORS, numExecutors); @@ -298,12 +299,13 @@ public void testNodeReEnabled() throws InterruptedException { disableTimeoutMillis); conf.setBoolean(LlapTaskSchedulerServiceForTest.LLAP_TASK_SCHEDULER_IN_TEST, true); - doReturn(clock).when(mockAppContext).getClock(); - doReturn(appAttemptId).when(mockAppContext).getApplicationAttemptId(); + doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId(); + doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier(); + doReturn(conf).when(mockAppCallback).getInitialConfiguration(); - ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, mockAppContext, null, 0, null, 11111, conf); + ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock); - ts.init(conf); + ts.initialize(); ts.start(); // One shceduler pass from the nodes that are added at startup awaitSchedulerRunNumber(1); @@ -329,7 +331,7 @@ void resetAppCallback() { } void shutdown() { - ts.stop(); + ts.shutdown(); } void allocateTask(Object task, String[] hosts, Priority priority, Object clientCookie) { @@ -358,18 +360,9 @@ void rejectExecution(Object task) { private boolean forTestSchedulerGoSignal = false; public LlapTaskSchedulerServiceForTest( - TaskSchedulerAppCallback appClient, AppContext appContext, String clientHostname, - int clientPort, String trackingUrl, long customAppIdIdentifier, - Configuration conf) { - super(appClient, appContext, clientHostname, clientPort, trackingUrl, customAppIdIdentifier, - conf); - this.inTest = conf.getBoolean(LLAP_TASK_SCHEDULER_IN_TEST, false); - } - - @Override - TaskSchedulerAppCallback createAppCallbackDelegate( - TaskSchedulerAppCallback realAppClient) { - return realAppClient; + TaskSchedulerContext appClient, Clock clock) { + super(appClient, clock); + this.inTest = appClient.getInitialConfiguration().getBoolean(LLAP_TASK_SCHEDULER_IN_TEST, false); } protected void schedulePendingTasks() {