diff --git llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java index b6633b8..0c90fe8 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java @@ -140,9 +140,10 @@ public LlapConfiguration() { LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.size"; public static final int LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT = 10; - public static final String LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING = - LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.fair.ordering"; - public static final boolean LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING_DEFAULT = false; + public static final String LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME = + LLAP_DAEMON_PREFIX + "wait.queue.comparator.class.name"; + public static final String LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME_DEFAULT = + "org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator"; public static final String LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION = LLAP_DAEMON_PREFIX + "task.scheduler.enable.preemption"; diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 710c593..411d965 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -96,9 +96,10 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi this.queryTracker = new QueryTracker(conf, localDirsBase); addIfService(queryTracker); - boolean useFairOrdering = conf.getBoolean(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING, - LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING_DEFAULT); - this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, useFairOrdering, + String waitQueueSchedulerClassName = + conf.get(LlapConfiguration.LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME, + LlapConfiguration.LLAP_DAEMON_WAIT_QUEUE_SCHEDULER_CLASS_NAME_DEFAULT); + this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, waitQueueSchedulerClassName, enablePreemption); AuxiliaryServiceHelper.setServiceDataIntoEnv( TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index f99c05d..badeb63 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.text.SimpleDateFormat; import java.util.Comparator; import java.util.Date; @@ -103,14 +105,32 @@ private final Object lock = new Object(); - public TaskExecutorService(int numExecutors, int waitQueueSize, boolean useFairOrdering, + public TaskExecutorService(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption) { super(TaskExecutorService.class.getSimpleName()); + LOG.info("TaskExecutorService is being setup with parameters: " + + "numExecutors=" + numExecutors + + ", waitQueueSize=" + waitQueueSize + + ", waitQueueComparatorClassName=" + waitQueueComparatorClassName + + ", enablePreemption=" + enablePreemption); + final Comparator waitQueueComparator; - if (useFairOrdering) { - waitQueueComparator = new FirstInFirstOutComparator(); - } else { - waitQueueComparator = new ShortestJobFirstComparator(); + try { + Class waitQueueComparatorClazz = + (Class) Class.forName( + waitQueueComparatorClassName); + Constructor ctor = waitQueueComparatorClazz.getConstructor(null); + waitQueueComparator = ctor.newInstance(null); + } catch (ClassNotFoundException e) { + throw new RuntimeException( + "Failed to load wait queue comparator, class=" + waitQueueComparatorClassName, e); + } catch (NoSuchMethodException e) { + throw new RuntimeException("Failed to find constructor for wait queue comparator, class=" + + waitQueueComparatorClassName, e); + } catch (InvocationTargetException | InstantiationException | IllegalAccessException e) { + throw new RuntimeException( + "Failed to find instantiate wait queue comparator, class=" + waitQueueComparatorClassName, + e); } this.waitQueue = new EvictingPriorityBlockingQueue<>(waitQueueComparator, waitQueueSize); this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size @@ -137,10 +157,7 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, boolean useFairO ListenableFuture future = waitQueueExecutorService.submit(new WaitQueueWorker()); Futures.addCallback(future, new WaitQueueWorkerCallback()); - LOG.info("TaskExecutorService started with parameters: " - + "numExecutors=" + numExecutors - + ", waitQueueSize=" + waitQueueSize - + ", enablePreemption=" + enablePreemption); + } @Override @@ -577,116 +594,7 @@ private void shutdownExecutor(ExecutorService executorService) { } } - // if map tasks and reduce tasks are in finishable state then priority is given to the task - // that has less number of pending tasks (shortest job) - @VisibleForTesting - public static class ShortestJobFirstComparator implements Comparator { - - @Override - public int compare(TaskWrapper t1, TaskWrapper t2) { - TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); - TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); - boolean o1CanFinish = o1.canFinish(); - boolean o2CanFinish = o2.canFinish(); - if (o1CanFinish == true && o2CanFinish == false) { - return -1; - } else if (o1CanFinish == false && o2CanFinish == true) { - return 1; - } - - FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); - FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); - - // Check if these belong to the same task, and work with withinDagPriority - if (o1.getQueryId().equals(o2.getQueryId())) { - // Same Query - // Within dag priority - lower values indicate higher priority. - if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) { - return -1; - } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){ - return 1; - } - } - - // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and - // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed. - int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks(); - int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks(); - if (knownPending1 < knownPending2) { - return -1; - } else if (knownPending1 > knownPending2) { - return 1; - } - - if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) { - return -1; - } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) { - return 1; - } - return 0; - } - } - - // if map tasks and reduce tasks are in finishable state then priority is given to the task in - // the following order - // 1) Dag start time - // 2) Within dag priority - // 3) Attempt start time - // 4) Vertex parallelism - @VisibleForTesting - public static class FirstInFirstOutComparator implements Comparator { - - @Override - public int compare(TaskWrapper t1, TaskWrapper t2) { - TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); - TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); - boolean o1CanFinish = o1.canFinish(); - boolean o2CanFinish = o2.canFinish(); - if (o1CanFinish == true && o2CanFinish == false) { - return -1; - } else if (o1CanFinish == false && o2CanFinish == true) { - return 1; - } - - FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); - FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); - - if (fri1.getDagStartTime() < fri2.getDagStartTime()) { - return -1; - } else if (fri1.getDagStartTime() > fri2.getDagStartTime()) { - return 1; - } - - // Check if these belong to the same task, and work with withinDagPriority - if (o1.getQueryId().equals(o2.getQueryId())) { - // Same Query - // Within dag priority - lower values indicate higher priority. - if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) { - return -1; - } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){ - return 1; - } - } - - if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) { - return -1; - } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) { - return 1; - } - // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and - // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed. - int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks(); - int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks(); - if (knownPending1 < knownPending2) { - return -1; - } else if (knownPending1 > knownPending2) { - return 1; - } - - return 0; - } - } @VisibleForTesting public static class PreemptionQueueComparator implements Comparator { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 6ceb2e5..e0bd48a 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; @@ -105,7 +106,8 @@ private final AtomicBoolean isCompleted = new AtomicBoolean(false); private final AtomicBoolean killInvoked = new AtomicBoolean(false); - TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo, + @VisibleForTesting + public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo, Configuration conf, ExecutionContext executionContext, Map envMap, Credentials credentials, diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java new file mode 100644 index 0000000..447fc7b --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/FirstInFirstOutComparator.java @@ -0,0 +1,81 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl.comparator; + +import java.util.Comparator; + +import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; +import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; + +// if map tasks and reduce tasks are in finishable state then priority is given to the task in +// the following order +// 1) Dag start time +// 2) Within dag priority +// 3) Attempt start time +// 4) Vertex parallelism +public class FirstInFirstOutComparator implements Comparator { + + @Override + public int compare(TaskWrapper t1, TaskWrapper t2) { + TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); + TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); + boolean o1CanFinish = o1.canFinish(); + boolean o2CanFinish = o2.canFinish(); + if (o1CanFinish == true && o2CanFinish == false) { + return -1; + } else if (o1CanFinish == false && o2CanFinish == true) { + return 1; + } + + LlapDaemonProtocolProtos.FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); + LlapDaemonProtocolProtos.FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); + + if (fri1.getDagStartTime() < fri2.getDagStartTime()) { + return -1; + } else if (fri1.getDagStartTime() > fri2.getDagStartTime()) { + return 1; + } + + // Check if these belong to the same task, and work with withinDagPriority + if (o1.getQueryId().equals(o2.getQueryId())) { + // Same Query + // Within dag priority - lower values indicate higher priority. + if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) { + return -1; + } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){ + return 1; + } + } + + if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) { + return -1; + } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) { + return 1; + } + + // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and + // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed. + int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks(); + int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks(); + if (knownPending1 < knownPending2) { + return -1; + } else if (knownPending1 > knownPending2) { + return 1; + } + + return 0; + } +} diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java new file mode 100644 index 0000000..238ae9e --- /dev/null +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl.comparator; + +import java.util.Comparator; + +import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; +import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; + +// if map tasks and reduce tasks are in finishable state then priority is given to the task +// that has less number of pending tasks (shortest job) +public class ShortestJobFirstComparator implements Comparator { + + @Override + public int compare(TaskWrapper t1, TaskWrapper t2) { + TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); + TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); + boolean o1CanFinish = o1.canFinish(); + boolean o2CanFinish = o2.canFinish(); + if (o1CanFinish == true && o2CanFinish == false) { + return -1; + } else if (o1CanFinish == false && o2CanFinish == true) { + return 1; + } + + LlapDaemonProtocolProtos.FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); + LlapDaemonProtocolProtos.FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); + + // Check if these belong to the same task, and work with withinDagPriority + if (o1.getQueryId().equals(o2.getQueryId())) { + // Same Query + // Within dag priority - lower values indicate higher priority. + if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) { + return -1; + } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){ + return 1; + } + } + + // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and + // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed. + int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks(); + int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks(); + if (knownPending1 < knownPending2) { + return -1; + } else if (knownPending1 > knownPending2) { + return 1; + } + + if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) { + return -1; + } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) { + return 1; + } + return 0; + } +} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java new file mode 100644 index 0000000..ec1ffcf --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -0,0 +1,238 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +import static org.mockito.Mockito.mock; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; +import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.apache.tez.runtime.task.EndReason; +import org.apache.tez.runtime.task.TaskRunner2Result; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TaskExecutorTestHelpers { + + private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecutorService.class); + + public static MockRequest createMockRequest(int fragmentNum, int parallelism, long startTime, + boolean canFinish, long workTime) { + SubmitWorkRequestProto + requestProto = createSubmitWorkRequestProto(fragmentNum, parallelism, + startTime); + MockRequest mockRequest = new MockRequest(requestProto, canFinish, workTime); + return mockRequest; + } + + public static TaskExecutorService.TaskWrapper createTaskWrapper( + SubmitWorkRequestProto request, boolean canFinish, int workTime) { + MockRequest mockRequest = new MockRequest(request, canFinish, workTime); + TaskExecutorService.TaskWrapper + taskWrapper = new TaskExecutorService.TaskWrapper(mockRequest, null); + return taskWrapper; + } + + + public static SubmitWorkRequestProto createSubmitWorkRequestProto( + int fragmentNumber, int selfAndUpstreamParallelism, + long attemptStartTime) { + return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, + attemptStartTime, 1); + } + + public static SubmitWorkRequestProto createSubmitWorkRequestProto( + int fragmentNumber, int selfAndUpstreamParallelism, + int selfAndUpstreamComplete, + long attemptStartTime, int withinDagPriority) { + ApplicationId appId = ApplicationId.newInstance(9999, 72); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID vId = TezVertexID.getInstance(dagId, 35); + TezTaskID tId = TezTaskID.getInstance(vId, 389); + TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber); + return SubmitWorkRequestProto + .newBuilder() + .setFragmentSpec( + LlapDaemonProtocolProtos.FragmentSpecProto + .newBuilder() + .setAttemptNumber(0) + .setDagName("MockDag") + .setFragmentNumber(fragmentNumber) + .setVertexName("MockVertex") + .setProcessorDescriptor( + LlapDaemonProtocolProtos.EntityDescriptorProto.newBuilder() + .setClassName("MockProcessor").build()) + .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost") + .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1") + .setContainerIdString("MockContainer_1").setUser("MockUser") + .setTokenIdentifier("MockToken_1") + .setFragmentRuntimeInfo(LlapDaemonProtocolProtos + .FragmentRuntimeInfo + .newBuilder() + .setFirstAttemptStartTime(attemptStartTime) + .setNumSelfAndUpstreamTasks(selfAndUpstreamParallelism) + .setNumSelfAndUpstreamCompletedTasks(selfAndUpstreamComplete) + .setWithinDagPriority(withinDagPriority) + .build()) + .build(); + } + + public static class MockRequest extends TaskRunnerCallable { + private final long workTime; + private final boolean canFinish; + + private final AtomicBoolean isStarted = new AtomicBoolean(false); + private final AtomicBoolean isFinished = new AtomicBoolean(false); + private final AtomicBoolean wasKilled = new AtomicBoolean(false); + private final AtomicBoolean wasInterrupted = new AtomicBoolean(false); + + private final ReentrantLock lock = new ReentrantLock(); + private final Condition startedCondition = lock.newCondition(); + private final Condition sleepCondition = lock.newCondition(); + private final Condition finishedCondition = lock.newCondition(); + + public MockRequest(SubmitWorkRequestProto requestProto, + boolean canFinish, long workTime) { + super(requestProto, mock(QueryFragmentInfo.class), new Configuration(), + new ExecutionContextImpl("localhost"), null, new Credentials(), 0, null, null, mock( + LlapDaemonExecutorMetrics.class), + mock(KilledTaskHandler.class), mock( + FragmentCompletionHandler.class)); + this.workTime = workTime; + this.canFinish = canFinish; + } + + @Override + protected TaskRunner2Result callInternal() { + try { + logInfo(super.getRequestId() + " is executing..", null); + lock.lock(); + try { + isStarted.set(true); + startedCondition.signal(); + } finally { + lock.unlock(); + } + + lock.lock(); + try { + sleepCondition.await(workTime, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + wasInterrupted.set(true); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); + } finally { + lock.unlock(); + } + if (wasKilled.get()) { + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); + } else { + return new TaskRunner2Result(EndReason.SUCCESS, null, false); + } + } finally { + lock.lock(); + try { + isFinished.set(true); + finishedCondition.signal(); + } finally { + lock.unlock(); + } + } + } + + @Override + public void killTask() { + lock.lock(); + try { + wasKilled.set(true); + sleepCondition.signal(); + } finally { + lock.unlock(); + } + } + + boolean hasStarted() { + return isStarted.get(); + } + + boolean hasFinished() { + return isFinished.get(); + } + + boolean wasPreempted() { + return wasKilled.get(); + } + + void complete() { + lock.lock(); + try { + sleepCondition.signal(); + } finally { + lock.unlock(); + } + } + + void awaitStart() throws InterruptedException { + lock.lock(); + try { + while (!isStarted.get()) { + startedCondition.await(); + } + } finally { + lock.unlock(); + } + } + + void awaitEnd() throws InterruptedException { + lock.lock(); + try { + while (!isFinished.get()) { + finishedCondition.await(); + } + } finally { + lock.unlock(); + } + } + + + @Override + public boolean canFinish() { + return canFinish; + } + } + + private static void logInfo(String message, Throwable t) { + LOG.info(message, t); + } + + private static void logInfo(String message) { + logInfo(message, null); + } + +} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 7a01b39..34ab40a 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -17,227 +17,31 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; +import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createMockRequest; +import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createSubmitWorkRequestProto; +import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; -import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; -import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.tez.dag.records.TezDAGID; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.runtime.api.impl.ExecutionContextImpl; -import org.apache.tez.runtime.task.EndReason; +import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.MockRequest; +import org.apache.hadoop.hive.llap.daemon.impl.comparator.ShortestJobFirstComparator; import org.apache.tez.runtime.task.TaskRunner2Result; -import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TestTaskExecutorService { - private static Configuration conf; - private static Credentials cred = new Credentials(); - private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecutorService.class); - - @Before - public void setup() { - conf = new Configuration(); - } - - - @Test(timeout = 5000) - public void testWaitQueueComparator() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000); - TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000); - TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000); - TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000); - TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), false, 1000000); - EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( - new TaskExecutorService.ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); - assertEquals(r1, queue.peek()); - // this offer will be rejected - assertEquals(r5, queue.offer(r5)); - assertEquals(r1, queue.take()); - assertEquals(r2, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r4, queue.take()); - - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), true, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), true, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); - queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); - assertEquals(r1, queue.peek()); - // this offer will be rejected - assertEquals(r5, queue.offer(r5)); - assertEquals(r1, queue.take()); - assertEquals(r2, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r4, queue.take()); - - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100), true, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200), false, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300), true, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400), false, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); - queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); - assertEquals(r1, queue.peek()); - // offer accepted and r4 gets evicted - assertEquals(r4, queue.offer(r5)); - assertEquals(r1, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r5, queue.take()); - assertEquals(r2, queue.take()); - - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); - queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); - assertEquals(r1, queue.peek()); - // offer accepted and r4 gets evicted - assertEquals(r4, queue.offer(r5)); - assertEquals(r1, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r5, queue.take()); - assertEquals(r2, queue.take()); - - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); - queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); - assertEquals(r1, queue.peek()); - // offer accepted and r4 gets evicted - assertEquals(r4, queue.offer(r5)); - assertEquals(r1, queue.take()); - assertEquals(r5, queue.take()); - assertEquals(r2, queue.take()); - assertEquals(r3, queue.take()); - - r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000); - r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), true, 100000); - r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000); - r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), true, 1000000); - r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); - queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.ShortestJobFirstComparator(), 4); - assertNull(queue.offer(r1)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); - assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); - assertEquals(r2, queue.peek()); - assertNull(queue.offer(r4)); - assertEquals(r2, queue.peek()); - // offer accepted, r1 evicted - assertEquals(r1, queue.offer(r5)); - assertEquals(r2, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r4, queue.take()); - assertEquals(r5, queue.take()); - } - - @Test(timeout = 5000) - public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 100, 10), false, 100000); - TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 100, 1), false, 100000); - TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 100, 5), false, 100000); - - EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( - new TaskExecutorService.ShortestJobFirstComparator(), 4); - - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); - - assertEquals(r2, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r1, queue.take()); - } - - @Test(timeout = 5000) - public void testWaitQueueComparatorParallelism() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 100, 1), false, 100000); // 7 pending - TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 100, 1), false, 100000); // 3 pending - TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 100, 1), false, 100000); // 5 pending - - EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( - new TaskExecutorService.ShortestJobFirstComparator(), 4); - - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); - - assertEquals(r2, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r1, queue.take()); - } @Test(timeout = 5000) public void testPreemptionQueueComparator() throws InterruptedException { @@ -265,8 +69,9 @@ public void testPreemptionQueueComparator() throws InterruptedException { public void testFinishablePreeptsNonFinishable() throws InterruptedException { MockRequest r1 = createMockRequest(1, 1, 100, false, 5000l); MockRequest r2 = createMockRequest(2, 1, 100, true, 1000l); - TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(1, 2, false, true); - taskExecutorService.init(conf); + TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(1, 2, + ShortestJobFirstComparator.class.getName(), true); + taskExecutorService.init(new Configuration()); taskExecutorService.start(); try { @@ -306,8 +111,8 @@ public void testWaitQueuePreemption() throws InterruptedException { MockRequest r5 = createMockRequest(5, 1, 500, true, 20000l); TaskExecutorServiceForTest taskExecutorService = - new TaskExecutorServiceForTest(1, 2, false, true); - taskExecutorService.init(conf); + new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true); + taskExecutorService.init(new Configuration()); taskExecutorService.start(); try { @@ -379,197 +184,12 @@ public void testWaitQueuePreemption() throws InterruptedException { } - // ----------- Helper classes and methods go after this point. Tests above this ----------- - - // Create requests with the same within dag priority - private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int selfAndUpstreamParallelism, - long attemptStartTime) { - return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, attemptStartTime, 1); - } - - private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int selfAndUpstreamParallelism, - int selfAndUpstreamComplete, - long attemptStartTime, int withinDagPriority) { - ApplicationId appId = ApplicationId.newInstance(9999, 72); - TezDAGID dagId = TezDAGID.getInstance(appId, 1); - TezVertexID vId = TezVertexID.getInstance(dagId, 35); - TezTaskID tId = TezTaskID.getInstance(vId, 389); - TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber); - return SubmitWorkRequestProto - .newBuilder() - .setFragmentSpec( - FragmentSpecProto - .newBuilder() - .setAttemptNumber(0) - .setDagName("MockDag") - .setFragmentNumber(fragmentNumber) - .setVertexName("MockVertex") - .setProcessorDescriptor( - EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) - .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost") - .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1") - .setContainerIdString("MockContainer_1").setUser("MockUser") - .setTokenIdentifier("MockToken_1") - .setFragmentRuntimeInfo(LlapDaemonProtocolProtos - .FragmentRuntimeInfo - .newBuilder() - .setFirstAttemptStartTime(attemptStartTime) - .setNumSelfAndUpstreamTasks(selfAndUpstreamParallelism) - .setNumSelfAndUpstreamCompletedTasks(selfAndUpstreamComplete) - .setWithinDagPriority(withinDagPriority) - .build()) - .build(); - } - - private MockRequest createMockRequest(int fragmentNum, int parallelism, long startTime, - boolean canFinish, long workTime) { - SubmitWorkRequestProto requestProto = createSubmitWorkRequestProto(fragmentNum, parallelism, - startTime); - MockRequest mockRequest = new MockRequest(requestProto, canFinish, workTime); - return mockRequest; - } - - private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) { - MockRequest mockRequest = new MockRequest(request, canFinish, workTime); - TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null); - return taskWrapper; - } - - private static void logInfo(String message, Throwable t) { - LOG.info(message, t); - } - - private static void logInfo(String message) { - logInfo(message, null); - } - - private static class MockRequest extends TaskRunnerCallable { - private final long workTime; - private final boolean canFinish; - - private final AtomicBoolean isStarted = new AtomicBoolean(false); - private final AtomicBoolean isFinished = new AtomicBoolean(false); - private final AtomicBoolean wasKilled = new AtomicBoolean(false); - private final AtomicBoolean wasInterrupted = new AtomicBoolean(false); - - private final ReentrantLock lock = new ReentrantLock(); - private final Condition startedCondition = lock.newCondition(); - private final Condition sleepCondition = lock.newCondition(); - private final Condition finishedCondition = lock.newCondition(); - - public MockRequest(LlapDaemonProtocolProtos.SubmitWorkRequestProto requestProto, - boolean canFinish, long workTime) { - super(requestProto, mock(QueryFragmentInfo.class), conf, - new ExecutionContextImpl("localhost"), null, cred, 0, null, null, mock( - LlapDaemonExecutorMetrics.class), - mock(KilledTaskHandler.class), mock( - FragmentCompletionHandler.class)); - this.workTime = workTime; - this.canFinish = canFinish; - } - @Override - protected TaskRunner2Result callInternal() { - try { - logInfo(super.getRequestId() + " is executing..", null); - lock.lock(); - try { - isStarted.set(true); - startedCondition.signal(); - } finally { - lock.unlock(); - } - - lock.lock(); - try { - sleepCondition.await(workTime, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - wasInterrupted.set(true); - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); - } finally { - lock.unlock(); - } - if (wasKilled.get()) { - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); - } else { - return new TaskRunner2Result(EndReason.SUCCESS, null, false); - } - } finally { - lock.lock(); - try { - isFinished.set(true); - finishedCondition.signal(); - } finally { - lock.unlock(); - } - } - } - - @Override - public void killTask() { - lock.lock(); - try { - wasKilled.set(true); - sleepCondition.signal(); - } finally { - lock.unlock(); - } - } - - boolean hasStarted() { - return isStarted.get(); - } - - boolean hasFinished() { - return isFinished.get(); - } - - boolean wasPreempted() { - return wasKilled.get(); - } - - void complete() { - lock.lock(); - try { - sleepCondition.signal(); - } finally { - lock.unlock(); - } - } - - void awaitStart() throws InterruptedException { - lock.lock(); - try { - while (!isStarted.get()) { - startedCondition.await(); - } - } finally { - lock.unlock(); - } - } - - void awaitEnd() throws InterruptedException { - lock.lock(); - try { - while (!isFinished.get()) { - finishedCondition.await(); - } - } finally { - lock.unlock(); - } - } - - - @Override - public boolean canFinish() { - return canFinish; - } - } private static class TaskExecutorServiceForTest extends TaskExecutorService { - public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, boolean useFairOrdering, + public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption) { - super(numExecutors, waitQueueSize, useFairOrdering, enablePreemption); + super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption); } private ConcurrentMap completionListeners = new ConcurrentHashMap<>(); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java deleted file mode 100644 index 1929439..0000000 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java +++ /dev/null @@ -1,324 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.daemon.impl; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.mock; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; -import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; -import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; -import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.tez.dag.records.TezDAGID; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.runtime.api.impl.ExecutionContextImpl; -import org.apache.tez.runtime.task.EndReason; -import org.apache.tez.runtime.task.TaskRunner2Result; -import org.junit.Before; -import org.junit.Test; - -public class TestTaskExecutorService2 { - private static Configuration conf; - private static Credentials cred = new Credentials(); - - private static class MockRequest extends TaskRunnerCallable { - private int workTime; - private boolean canFinish; - - public MockRequest(SubmitWorkRequestProto requestProto, - boolean canFinish, int workTime) { - super(requestProto, mock(QueryFragmentInfo.class), conf, - new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null, - mock(KilledTaskHandler.class), mock( - FragmentCompletionHandler.class)); - this.workTime = workTime; - this.canFinish = canFinish; - } - - @Override - protected TaskRunner2Result callInternal() { - System.out.println(super.getRequestId() + " is executing.."); - try { - Thread.sleep(workTime); - } catch (InterruptedException e) { - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); - } - return new TaskRunner2Result(EndReason.SUCCESS, null, false); - } - - @Override - public boolean canFinish() { - return canFinish; - } - } - - @Before - public void setup() { - conf = new Configuration(); - } - - private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime, - int attemptStartTime) { - // Same priority for all tasks. - return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, dagStartTime, attemptStartTime, 1); - } - - private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, - int numSelfAndUpstreamComplete, int dagStartTime, - int attemptStartTime, int withinDagPriority) { - ApplicationId appId = ApplicationId.newInstance(9999, 72); - TezDAGID dagId = TezDAGID.getInstance(appId, 1); - TezVertexID vId = TezVertexID.getInstance(dagId, 35); - TezTaskID tId = TezTaskID.getInstance(vId, 389); - TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber); - return SubmitWorkRequestProto - .newBuilder() - .setFragmentSpec( - FragmentSpecProto - .newBuilder() - .setAttemptNumber(0) - .setDagName("MockDag") - .setFragmentNumber(fragmentNumber) - .setVertexName("MockVertex") - .setProcessorDescriptor( - EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) - .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost") - .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1") - .setContainerIdString("MockContainer_1").setUser("MockUser") - .setTokenIdentifier("MockToken_1") - .setFragmentRuntimeInfo(LlapDaemonProtocolProtos - .FragmentRuntimeInfo - .newBuilder() - .setDagStartTime(dagStartTime) - .setFirstAttemptStartTime(attemptStartTime) - .setNumSelfAndUpstreamTasks(numSelfAndUpstreamTasks) - .setNumSelfAndUpstreamCompletedTasks(numSelfAndUpstreamComplete) - .setWithinDagPriority(withinDagPriority) - .build()) - .build(); - } - - @Test - public void testWaitQueueComparator() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000); - TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000); - TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000); - TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000); - TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 1, 500), false, 1000000); - EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( - new TaskExecutorService.FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); - assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); - assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); - assertEquals(r4, queue.peek()); - // this offer will be accepted and r1 evicted - assertEquals(r1, queue.offer(r5)); - assertEquals(r5, queue.take()); - assertEquals(r4, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r2, queue.take()); - - r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000); - r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000); - r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000); - r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000); - r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); - queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); - assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); - assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); - assertEquals(r4, queue.peek()); - // this offer will be accpeted and r1 evicted - assertEquals(r1, queue.offer(r5)); - assertEquals(r5, queue.take()); - assertEquals(r4, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r2, queue.take()); - - r1 = createTaskWrapper(createRequest(1, 1, 5, 100), true, 100000); - r2 = createTaskWrapper(createRequest(2, 1, 4, 200), false, 100000); - r3 = createTaskWrapper(createRequest(3, 1, 3, 300), true, 1000000); - r4 = createTaskWrapper(createRequest(4, 1, 2, 400), false, 1000000); - r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); - queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); - assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); - assertEquals(r3, queue.peek()); - // offer accepted and r2 gets evicted - assertEquals(r2, queue.offer(r5)); - assertEquals(r5, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r1, queue.take()); - assertEquals(r4, queue.take()); - - r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000); - r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000); - r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000); - r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000); - r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); - queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); - assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); - assertEquals(r3, queue.peek()); - // offer accepted and r2 gets evicted - assertEquals(r2, queue.offer(r5)); - assertEquals(r5, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r1, queue.take()); - assertEquals(r4, queue.take()); - - r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000); - r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000); - r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000); - r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000); - r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); - queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r3)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r4)); - assertEquals(r1, queue.peek()); - // offer accepted and r2 gets evicted - assertEquals(r2, queue.offer(r5)); - assertEquals(r5, queue.take()); - assertEquals(r1, queue.take()); - assertEquals(r4, queue.take()); - assertEquals(r3, queue.take()); - - r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000); - r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000); - r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000); - r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000); - r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); - queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); - assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); - assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); - assertEquals(r4, queue.peek()); - // offer accepted, r1 evicted - assertEquals(r1, queue.offer(r5)); - assertEquals(r5, queue.take()); - assertEquals(r4, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r2, queue.take()); - - r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000); - r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000); - r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000); - r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000); - r5 = createTaskWrapper(createRequest(5, 10, 2, 500), true, 1000000); - queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.FirstInFirstOutComparator(), 4); - assertNull(queue.offer(r1)); - assertEquals(r1, queue.peek()); - assertNull(queue.offer(r2)); - assertEquals(r2, queue.peek()); - assertNull(queue.offer(r3)); - assertEquals(r3, queue.peek()); - assertNull(queue.offer(r4)); - assertEquals(r4, queue.peek()); - // offer accepted, r1 evicted - assertEquals(r1, queue.offer(r5)); - assertEquals(r4, queue.take()); - assertEquals(r5, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r2, queue.take()); - } - - @Test(timeout = 5000) - public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 100, 100, 10), false, 100000); - TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 100, 100, 1), false, 100000); - TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 100, 100, 5), false, 100000); - - EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( - new TaskExecutorService.ShortestJobFirstComparator(), 4); - - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); - - assertEquals(r2, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r1, queue.take()); - } - - @Test(timeout = 5000) - public void testWaitQueueComparatorParallelism() throws InterruptedException { - TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1), false, 100000); - TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1), false, 100000); - TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1), false, 100000); - - EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( - new TaskExecutorService.ShortestJobFirstComparator(), 4); - - assertNull(queue.offer(r1)); - assertNull(queue.offer(r2)); - assertNull(queue.offer(r3)); - - assertEquals(r2, queue.take()); - assertEquals(r3, queue.take()); - assertEquals(r1, queue.take()); - } - - - private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) { - MockRequest mockRequest = new MockRequest(request, canFinish, workTime); - TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null); - return taskWrapper; - } -} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java new file mode 100644 index 0000000..ebfb430 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl.comparator; + +import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; +import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; +import org.apache.hadoop.hive.llap.daemon.impl.EvictingPriorityBlockingQueue; +import org.apache.hadoop.hive.llap.daemon.impl.QueryFragmentInfo; +import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; +import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.apache.tez.runtime.task.EndReason; +import org.apache.tez.runtime.task.TaskRunner2Result; +import org.junit.Before; +import org.junit.Test; + +public class TestFirstInFirstOutComparator { + private static Configuration conf; + private static Credentials cred = new Credentials(); + + private static class MockRequest extends TaskRunnerCallable { + private int workTime; + private boolean canFinish; + + public MockRequest(SubmitWorkRequestProto requestProto, + boolean canFinish, int workTime) { + super(requestProto, mock(QueryFragmentInfo.class), conf, + new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null, + mock(KilledTaskHandler.class), mock( + FragmentCompletionHandler.class)); + this.workTime = workTime; + this.canFinish = canFinish; + } + + @Override + protected TaskRunner2Result callInternal() { + System.out.println(super.getRequestId() + " is executing.."); + try { + Thread.sleep(workTime); + } catch (InterruptedException e) { + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); + } + return new TaskRunner2Result(EndReason.SUCCESS, null, false); + } + + @Override + public boolean canFinish() { + return canFinish; + } + } + + @Before + public void setup() { + conf = new Configuration(); + } + + private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime, + int attemptStartTime) { + // Same priority for all tasks. + return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, dagStartTime, attemptStartTime, 1); + } + + private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, + int numSelfAndUpstreamComplete, int dagStartTime, + int attemptStartTime, int withinDagPriority) { + ApplicationId appId = ApplicationId.newInstance(9999, 72); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID vId = TezVertexID.getInstance(dagId, 35); + TezTaskID tId = TezTaskID.getInstance(vId, 389); + TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber); + return SubmitWorkRequestProto + .newBuilder() + .setFragmentSpec( + FragmentSpecProto + .newBuilder() + .setAttemptNumber(0) + .setDagName("MockDag") + .setFragmentNumber(fragmentNumber) + .setVertexName("MockVertex") + .setProcessorDescriptor( + EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) + .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost") + .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1") + .setContainerIdString("MockContainer_1").setUser("MockUser") + .setTokenIdentifier("MockToken_1") + .setFragmentRuntimeInfo(LlapDaemonProtocolProtos + .FragmentRuntimeInfo + .newBuilder() + .setDagStartTime(dagStartTime) + .setFirstAttemptStartTime(attemptStartTime) + .setNumSelfAndUpstreamTasks(numSelfAndUpstreamTasks) + .setNumSelfAndUpstreamCompletedTasks(numSelfAndUpstreamComplete) + .setWithinDagPriority(withinDagPriority) + .build()) + .build(); + } + + @Test + public void testWaitQueueComparator() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000); + TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000); + TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 1, 500), false, 1000000); + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new FirstInFirstOutComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r2, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r3, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r4, queue.peek()); + // this offer will be accepted and r1 evicted + assertEquals(r1, queue.offer(r5)); + assertEquals(r5, queue.take()); + assertEquals(r4, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r2, queue.take()); + + r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new FirstInFirstOutComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r2, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r3, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r4, queue.peek()); + // this offer will be accpeted and r1 evicted + assertEquals(r1, queue.offer(r5)); + assertEquals(r5, queue.take()); + assertEquals(r4, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r2, queue.take()); + + r1 = createTaskWrapper(createRequest(1, 1, 5, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 1, 4, 200), false, 100000); + r3 = createTaskWrapper(createRequest(3, 1, 3, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 1, 2, 400), false, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new FirstInFirstOutComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r3, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r3, queue.peek()); + // offer accepted and r2 gets evicted + assertEquals(r2, queue.offer(r5)); + assertEquals(r5, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + assertEquals(r4, queue.take()); + + r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new FirstInFirstOutComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r3, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r3, queue.peek()); + // offer accepted and r2 gets evicted + assertEquals(r2, queue.offer(r5)); + assertEquals(r5, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + assertEquals(r4, queue.take()); + + r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new FirstInFirstOutComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r1, queue.peek()); + // offer accepted and r2 gets evicted + assertEquals(r2, queue.offer(r5)); + assertEquals(r5, queue.take()); + assertEquals(r1, queue.take()); + assertEquals(r4, queue.take()); + assertEquals(r3, queue.take()); + + r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new FirstInFirstOutComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r2, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r3, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r4, queue.peek()); + // offer accepted, r1 evicted + assertEquals(r1, queue.offer(r5)); + assertEquals(r5, queue.take()); + assertEquals(r4, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r2, queue.take()); + + r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 2, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new FirstInFirstOutComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r2, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r3, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r4, queue.peek()); + // offer accepted, r1 evicted + assertEquals(r1, queue.offer(r5)); + assertEquals(r4, queue.take()); + assertEquals(r5, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r2, queue.take()); + } + + @Test(timeout = 5000) + public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 100, 100, 10), false, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 100, 100, 1), false, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 100, 100, 5), false, 100000); + + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new FirstInFirstOutComparator(), 4); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + } + + @Test(timeout = 5000) + public void testWaitQueueComparatorParallelism() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1), false, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1), false, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1), false, 100000); + + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new FirstInFirstOutComparator(), 4); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + } +} diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java new file mode 100644 index 0000000..9dafd15 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java @@ -0,0 +1,199 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl.comparator; + +import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createSubmitWorkRequestProto; +import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.apache.hadoop.hive.llap.daemon.impl.EvictingPriorityBlockingQueue; +import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; +import org.junit.Test; + +public class TestShortestJobFirstComparator { + + + @Test(timeout = 5000) + public void testWaitQueueComparator() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000); + TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000); + TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), false, 1000000); + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new ShortestJobFirstComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r1, queue.peek()); + // this offer will be rejected + assertEquals(r5, queue.offer(r5)); + assertEquals(r1, queue.take()); + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r4, queue.take()); + + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), true, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), true, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new ShortestJobFirstComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r1, queue.peek()); + // this offer will be rejected + assertEquals(r5, queue.offer(r5)); + assertEquals(r1, queue.take()); + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r4, queue.take()); + + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 100), true, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 200), false, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300), true, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400), false, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new ShortestJobFirstComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r1, queue.peek()); + // offer accepted and r4 gets evicted + assertEquals(r4, queue.offer(r5)); + assertEquals(r1, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r5, queue.take()); + assertEquals(r2, queue.take()); + + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new ShortestJobFirstComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r1, queue.peek()); + // offer accepted and r4 gets evicted + assertEquals(r4, queue.offer(r5)); + assertEquals(r1, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r5, queue.take()); + assertEquals(r2, queue.take()); + + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), true, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), false, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), false, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new ShortestJobFirstComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r1, queue.peek()); + // offer accepted and r4 gets evicted + assertEquals(r4, queue.offer(r5)); + assertEquals(r1, queue.take()); + assertEquals(r5, queue.take()); + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000); + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), true, 100000); + r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 6, 300), true, 1000000); + r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 8, 400), true, 1000000); + r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 10, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new ShortestJobFirstComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r2, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r2, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r2, queue.peek()); + // offer accepted, r1 evicted + assertEquals(r1, queue.offer(r5)); + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r4, queue.take()); + assertEquals(r5, queue.take()); + } + + @Test(timeout = 5000) + public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 100, 10), false, 100000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 100, 1), false, 100000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 100, 5), false, 100000); + + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new ShortestJobFirstComparator(), 4); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + } + + @Test(timeout = 5000) + public void testWaitQueueComparatorParallelism() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 100, 1), false, 100000); // 7 pending + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 100, 1), false, 100000); // 3 pending + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 100, 1), false, 100000); // 5 pending + + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new ShortestJobFirstComparator(), 4); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + } +}