diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java index f5aa2a6..b6633b8 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java @@ -140,6 +140,10 @@ public LlapConfiguration() { LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.size"; public static final int LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE_DEFAULT = 10; + public static final String LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING = + LLAP_DAEMON_PREFIX + "task.scheduler.wait.queue.fair.ordering"; + public static final boolean LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING_DEFAULT = false; + public static final String LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION = LLAP_DAEMON_PREFIX + "task.scheduler.enable.preemption"; public static final boolean LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION_DEFAULT = true; diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index e26852a..cba057c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; @@ -54,11 +55,11 @@ import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; - -import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + // TODO Convert this to a CompositeService public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler { @@ -93,7 +94,10 @@ public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSi this.queryTracker = new QueryTracker(conf, localDirsBase); addIfService(queryTracker); - this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, enablePreemption); + boolean useFairOrdering = conf.getBoolean(LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING, + LlapConfiguration.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_FAIR_ORDERING_DEFAULT); + this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, useFairOrdering, + enablePreemption); AuxiliaryServiceHelper.setServiceDataIntoEnv( TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, ByteBuffer.allocate(4).putInt(localShufflePort), localEnv); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 0fd89de..f083a48 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -100,9 +100,16 @@ private final Object lock = new Object(); - public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePreemption) { + public TaskExecutorService(int numExecutors, int waitQueueSize, boolean useFairOrdering, + boolean enablePreemption) { super(TaskExecutorService.class.getSimpleName()); - this.waitQueue = new EvictingPriorityBlockingQueue<>(new WaitQueueComparator(), waitQueueSize); + final Comparator waitQueueComparator; + if (useFairOrdering) { + waitQueueComparator = new FirstInFirstOutComparator(); + } else { + waitQueueComparator = new ShortestJobFirstComparator(); + } + this.waitQueue = new EvictingPriorityBlockingQueue<>(waitQueueComparator, waitQueueSize); this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size numExecutors, // max pool size 1, TimeUnit.MINUTES, @@ -540,8 +547,10 @@ private void shutdownExecutor(ExecutorService executorService) { } } + // if map tasks and reduce tasks are in finishable state then priority is given to the task + // that has less number of pending tasks (shortest job) @VisibleForTesting - public static class WaitQueueComparator implements Comparator { + public static class ShortestJobFirstComparator implements Comparator { @Override public int compare(TaskWrapper t1, TaskWrapper t2) { @@ -570,6 +579,48 @@ public int compare(TaskWrapper t1, TaskWrapper t2) { } } + // if map tasks and reduce tasks are in finishable state then priority is given to the task in + // the following order + // 1) Dag start time + // 2) Attempt start time + // 3) Vertex parallelism + @VisibleForTesting + public static class FirstInFirstOutComparator implements Comparator { + + @Override + public int compare(TaskWrapper t1, TaskWrapper t2) { + TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); + TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); + boolean newCanFinish = o1.canFinish(); + boolean oldCanFinish = o2.canFinish(); + if (newCanFinish == true && oldCanFinish == false) { + return -1; + } else if (newCanFinish == false && oldCanFinish == true) { + return 1; + } + + if (o1.getDagStartTime() < o2.getDagStartTime()) { + return -1; + } else if (o1.getDagStartTime() > o2.getDagStartTime()) { + return 1; + } + + if (o1.getFirstAttemptStartTime() < o2.getFirstAttemptStartTime()) { + return -1; + } else if (o1.getFirstAttemptStartTime() > o2.getFirstAttemptStartTime()) { + return 1; + } + + if (o1.getVertexParallelism() < o2.getVertexParallelism()) { + return -1; + } else if (o1.getVertexParallelism() > o2.getVertexParallelism()) { + return 1; + } + + return 0; + } + } + @VisibleForTesting public static class PreemptionQueueComparator implements Comparator { diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 9b14fa3..583c6a9 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -472,8 +472,8 @@ public long getFirstAttemptStartTime() { return request.getFragmentRuntimeInfo().getFirstAttemptStartTime(); } - public long getCurrentAttemptStartTime() { - return request.getFragmentRuntimeInfo().getCurrentAttemptStartTime(); + public long getDagStartTime() { + return request.getFragmentRuntimeInfo().getDagStartTime(); } } diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 6b6fac0..dd5b457 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -113,7 +113,7 @@ private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism .build()) .build(); } - + @Test public void testWaitQueueComparator() throws InterruptedException { TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 100), false, 100000); @@ -122,7 +122,7 @@ public void testWaitQueueComparator() throws InterruptedException { TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000); TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 500), false, 1000000); EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( - new TaskExecutorService.WaitQueueComparator(), 4); + new TaskExecutorService.ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); assertEquals(r1, queue.peek()); assertNull(queue.offer(r2)); @@ -144,7 +144,7 @@ public void testWaitQueueComparator() throws InterruptedException { r4 = createTaskWrapper(createRequest(4, 8, 400), true, 1000000); r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.WaitQueueComparator(), 4); + new TaskExecutorService.ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); assertEquals(r1, queue.peek()); assertNull(queue.offer(r2)); @@ -166,7 +166,7 @@ public void testWaitQueueComparator() throws InterruptedException { r4 = createTaskWrapper(createRequest(4, 1, 400), false, 1000000); r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.WaitQueueComparator(), 4); + new TaskExecutorService.ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); assertEquals(r1, queue.peek()); assertNull(queue.offer(r2)); @@ -175,7 +175,7 @@ public void testWaitQueueComparator() throws InterruptedException { assertEquals(r1, queue.peek()); assertNull(queue.offer(r4)); assertEquals(r1, queue.peek()); - // offer accepted and r2 gets evicted + // offer accepted and r4 gets evicted assertEquals(r4, queue.offer(r5)); assertEquals(r1, queue.take()); assertEquals(r3, queue.take()); @@ -188,7 +188,7 @@ public void testWaitQueueComparator() throws InterruptedException { r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000); r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.WaitQueueComparator(), 4); + new TaskExecutorService.ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); assertEquals(r1, queue.peek()); assertNull(queue.offer(r2)); @@ -210,7 +210,7 @@ public void testWaitQueueComparator() throws InterruptedException { r4 = createTaskWrapper(createRequest(4, 8, 400), false, 1000000); r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.WaitQueueComparator(), 4); + new TaskExecutorService.ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); assertEquals(r1, queue.peek()); assertNull(queue.offer(r2)); @@ -232,7 +232,7 @@ public void testWaitQueueComparator() throws InterruptedException { r4 = createTaskWrapper(createRequest(4, 8, 400), true, 1000000); r5 = createTaskWrapper(createRequest(5, 10, 500), true, 1000000); queue = new EvictingPriorityBlockingQueue( - new TaskExecutorService.WaitQueueComparator(), 4); + new TaskExecutorService.ShortestJobFirstComparator(), 4); assertNull(queue.offer(r1)); assertEquals(r1, queue.peek()); assertNull(queue.offer(r2)); diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java new file mode 100644 index 0000000..ad2a15b --- /dev/null +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; +import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; +import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.apache.tez.runtime.task.EndReason; +import org.apache.tez.runtime.task.TaskRunner2Result; +import org.junit.Before; +import org.junit.Test; + +public class TestTaskExecutorService2 { + private static Configuration conf; + private static Credentials cred = new Credentials(); + + private static class MockRequest extends TaskRunnerCallable { + private int workTime; + private boolean canFinish; + + public MockRequest(SubmitWorkRequestProto requestProto, + boolean canFinish, int workTime) { + super(requestProto, mock(QueryFragmentInfo.class), conf, + new ExecutionContextImpl("localhost"), null, cred, 0, null, null, null, + mock(KilledTaskHandler.class), mock( + FragmentCompletionHandler.class)); + this.workTime = workTime; + this.canFinish = canFinish; + } + + @Override + protected TaskRunner2Result callInternal() { + System.out.println(super.getRequestId() + " is executing.."); + try { + Thread.sleep(workTime); + } catch (InterruptedException e) { + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); + } + return new TaskRunner2Result(EndReason.SUCCESS, null, false); + } + + @Override + public boolean canFinish() { + return canFinish; + } + } + + @Before + public void setup() { + conf = new Configuration(); + } + + private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism, int dagStartTime, + int attemptStartTime) { + ApplicationId appId = ApplicationId.newInstance(9999, 72); + TezDAGID dagId = TezDAGID.getInstance(appId, 1); + TezVertexID vId = TezVertexID.getInstance(dagId, 35); + TezTaskID tId = TezTaskID.getInstance(vId, 389); + TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tId, fragmentNumber); + return SubmitWorkRequestProto + .newBuilder() + .setFragmentSpec( + FragmentSpecProto + .newBuilder() + .setAttemptNumber(0) + .setDagName("MockDag") + .setFragmentNumber(fragmentNumber) + .setVertexName("MockVertex") + .setVertexParallelism(parallelism) + .setProcessorDescriptor( + EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) + .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost") + .setAmPort(12345).setAppAttemptNumber(0).setApplicationIdString("MockApp_1") + .setContainerIdString("MockContainer_1").setUser("MockUser") + .setTokenIdentifier("MockToken_1") + .setFragmentRuntimeInfo(LlapDaemonProtocolProtos + .FragmentRuntimeInfo + .newBuilder() + .setDagStartTime(dagStartTime) + .setFirstAttemptStartTime(attemptStartTime) + .build()) + .build(); + } + + @Test + public void testWaitQueueComparator() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000); + TaskWrapper r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000); + TaskWrapper r5 = createTaskWrapper(createRequest(5, 10, 1, 500), false, 1000000); + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue<>( + new TaskExecutorService.FirstInFirstOutComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r2, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r3, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r4, queue.peek()); + // this offer will be accepted and r1 evicted + assertEquals(r1, queue.offer(r5)); + assertEquals(r5, queue.take()); + assertEquals(r4, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r2, queue.take()); + + r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new TaskExecutorService.FirstInFirstOutComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r2, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r3, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r4, queue.peek()); + // this offer will be accpeted and r1 evicted + assertEquals(r1, queue.offer(r5)); + assertEquals(r5, queue.take()); + assertEquals(r4, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r2, queue.take()); + + r1 = createTaskWrapper(createRequest(1, 1, 5, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 1, 4, 200), false, 100000); + r3 = createTaskWrapper(createRequest(3, 1, 3, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 1, 2, 400), false, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new TaskExecutorService.FirstInFirstOutComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r3, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r3, queue.peek()); + // offer accepted and r2 gets evicted + assertEquals(r2, queue.offer(r5)); + assertEquals(r5, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + assertEquals(r4, queue.take()); + + r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new TaskExecutorService.FirstInFirstOutComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r3, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r3, queue.peek()); + // offer accepted and r2 gets evicted + assertEquals(r2, queue.offer(r5)); + assertEquals(r5, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + assertEquals(r4, queue.take()); + + r1 = createTaskWrapper(createRequest(1, 2, 5, 100), true, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 4, 200), false, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 3, 300), false, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 2, 400), false, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new TaskExecutorService.FirstInFirstOutComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r1, queue.peek()); + // offer accepted and r2 gets evicted + assertEquals(r2, queue.offer(r5)); + assertEquals(r5, queue.take()); + assertEquals(r1, queue.take()); + assertEquals(r4, queue.take()); + assertEquals(r3, queue.take()); + + r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 1, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new TaskExecutorService.FirstInFirstOutComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r2, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r3, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r4, queue.peek()); + // offer accepted, r1 evicted + assertEquals(r1, queue.offer(r5)); + assertEquals(r5, queue.take()); + assertEquals(r4, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r2, queue.take()); + + r1 = createTaskWrapper(createRequest(1, 2, 5, 100), false, 100000); + r2 = createTaskWrapper(createRequest(2, 4, 4, 200), true, 100000); + r3 = createTaskWrapper(createRequest(3, 6, 3, 300), true, 1000000); + r4 = createTaskWrapper(createRequest(4, 8, 2, 400), true, 1000000); + r5 = createTaskWrapper(createRequest(5, 10, 2, 500), true, 1000000); + queue = new EvictingPriorityBlockingQueue( + new TaskExecutorService.FirstInFirstOutComparator(), 4); + assertNull(queue.offer(r1)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2)); + assertEquals(r2, queue.peek()); + assertNull(queue.offer(r3)); + assertEquals(r3, queue.peek()); + assertNull(queue.offer(r4)); + assertEquals(r4, queue.peek()); + // offer accepted, r1 evicted + assertEquals(r1, queue.offer(r5)); + assertEquals(r4, queue.take()); + assertEquals(r5, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r2, queue.take()); + } + + private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) { + MockRequest mockRequest = new MockRequest(request, canFinish, workTime); + TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null); + return taskWrapper; + } +}