it = list.descendingIterator(); it.hasNext();) {
+ E e = it.next();
+ if (o.equals(e)) {
+ it.remove();
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // BlockingQueue methods
+
+ /**
+ * Inserts the specified element to the deque unless it would
+ * violate capacity restrictions. When using a capacity-restricted deque,
+ * it is generally preferable to use method {@link #offer(Object) offer}.
+ *
+ * This method is equivalent to {@link #addLast}.
+ *
+ * @throws IllegalStateException if the element cannot be added at this
+ * time due to capacity restrictions
+ * @throws NullPointerException if the specified element is null
+ */
+ @Override
+ public boolean add(E e) {
+ addLast(e);
+ return true;
+ }
+
+ /**
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean offer(E e) {
+ return offerLast(e);
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws InterruptedException {@inheritDoc}
+ */
+ public void put(E e) throws InterruptedException {
+ putLast(e);
+ }
+
+ /**
+ * @throws NullPointerException {@inheritDoc}
+ * @throws InterruptedException {@inheritDoc}
+ */
+ public boolean offer(E e, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return offerLast(e, timeout, unit);
+ }
+
+ /**
+ * Retrieves and removes the head of the queue represented by this deque.
+ * This method differs from {@link #poll poll} only in that it throws an
+ * exception if this deque is empty.
+ *
+ * This method is equivalent to {@link #removeFirst() removeFirst}.
+ *
+ * @return the head of the queue represented by this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ @Override
+ public E remove() {
+ return removeFirst();
+ }
+
+ public E poll() {
+ return pollFirst();
+ }
+
+ public E take() throws InterruptedException {
+ return takeFirst();
+ }
+
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ return pollFirst(timeout, unit);
+ }
+
+ /**
+ * Retrieves, but does not remove, the head of the queue represented by
+ * this deque. This method differs from {@link #peek peek} only in that
+ * it throws an exception if this deque is empty.
+ *
+ * This method is equivalent to {@link #getFirst() getFirst}.
+ *
+ * @return the head of the queue represented by this deque
+ * @throws NoSuchElementException if this deque is empty
+ */
+ @Override
+ public E element() {
+ return getFirst();
+ }
+
+ public E peek() {
+ return peekFirst();
+ }
+
+ /**
+ * Returns the number of additional elements that this deque can ideally
+ * (in the absence of memory or resource constraints) accept without
+ * blocking. This is always equal to the initial capacity of this deque
+ * less the current size of this deque.
+ *
+ * Note that you cannot always tell if an attempt to insert
+ * an element will succeed by inspecting remainingCapacity
+ * because it may be the case that another thread is about to
+ * insert or remove an element.
+ */
+ public int remainingCapacity() {
+ lock.lock();
+ try {
+ return capacity - list.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @throws UnsupportedOperationException {@inheritDoc}
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public int drainTo(Collection super E> c) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ lock.lock();
+ try {
+ for (E e : list) {
+ c.add(e);
+ }
+ int n = list.size();
+ list.clear();
+ notFull.signalAll();
+ return n;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * @throws UnsupportedOperationException {@inheritDoc}
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public int drainTo(Collection super E> c, int maxElements) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ lock.lock();
+ try {
+ int n = 0;
+ for (Iterator it = list.iterator(); n < maxElements && it.hasNext();) {
+ E e = it.next();
+ c.add(e);
+ it.remove();
+ ++n;
+ }
+
+ notFull.signalAll();
+ return n;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // Stack methods
+
+ /**
+ * @throws IllegalStateException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public void push(E e) {
+ addFirst(e);
+ }
+
+ /**
+ * @throws NoSuchElementException {@inheritDoc}
+ */
+ public E pop() {
+ return removeFirst();
+ }
+
+ // Collection methods
+
+ /**
+ * Removes the first occurrence of the specified element from this deque.
+ * If the deque does not contain the element, it is unchanged.
+ * More formally, removes the first element e such that
+ * o.equals(e) (if such an element exists).
+ * Returns true if this deque contained the specified element
+ * (or equivalently, if this deque changed as a result of the call).
+ *
+ * This method is equivalent to
+ * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}.
+ *
+ * @param o element to be removed from this deque, if present
+ * @return true if this deque changed as a result of the call
+ */
+ @Override
+ public boolean remove(Object o) {
+ return removeFirstOccurrence(o);
+ }
+
+ /**
+ * Returns the number of elements in this deque.
+ *
+ * @return the number of elements in this deque
+ */
+ @Override
+ public int size() {
+ lock.lock();
+ try {
+ return list.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns true if this deque contains the specified element.
+ * More formally, returns true if and only if this deque contains
+ * at least one element e such that o.equals(e).
+ *
+ * @param o object to be checked for containment in this deque
+ * @return true if this deque contains the specified element
+ */
+ @Override
+ public boolean contains(Object o) {
+ if (o == null) return false;
+ lock.lock();
+ try {
+ return list.contains(o);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns an array containing all of the elements in this deque, in
+ * proper sequence (from first to last element).
+ *
+ * The returned array will be "safe" in that no references to it are
+ * maintained by this deque. (In other words, this method must allocate
+ * a new array). The caller is thus free to modify the returned array.
+ *
+ * This method acts as bridge between array-based and collection-based
+ * APIs.
+ *
+ * @return an array containing all of the elements in this deque
+ */
+ @Override
+ public Object[] toArray() {
+ lock.lock();
+ try {
+ return list.toArray();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Returns an array containing all of the elements in this deque, in
+ * proper sequence; the runtime type of the returned array is that of
+ * the specified array. If the deque fits in the specified array, it
+ * is returned therein. Otherwise, a new array is allocated with the
+ * runtime type of the specified array and the size of this deque.
+ *
+ * If this deque fits in the specified array with room to spare
+ * (i.e., the array has more elements than this deque), the element in
+ * the array immediately following the end of the deque is set to
+ * null.
+ *
+ * Like the {@link #toArray()} method, this method acts as bridge between
+ * array-based and collection-based APIs. Further, this method allows
+ * precise control over the runtime type of the output array, and may,
+ * under certain circumstances, be used to save allocation costs.
+ *
+ * Suppose x is a deque known to contain only strings.
+ * The following code can be used to dump the deque into a newly
+ * allocated array of String:
+ *
+ *
+ * String[] y = x.toArray(new String[0]);
+ *
+ * Note that toArray(new Object[0]) is identical in function to
+ * toArray().
+ *
+ * @param a the array into which the elements of the deque are to
+ * be stored, if it is big enough; otherwise, a new array of the
+ * same runtime type is allocated for this purpose
+ * @return an array containing all of the elements in this deque
+ * @throws ArrayStoreException if the runtime type of the specified array
+ * is not a supertype of the runtime type of every element in
+ * this deque
+ * @throws NullPointerException if the specified array is null
+ */
+ @Override
+ public T[] toArray(T[] a) {
+ lock.lock();
+ try {
+ return list.toArray(a);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public String toString() {
+ lock.lock();
+ try {
+ return super.toString();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Atomically removes all of the elements from this deque.
+ * The deque will be empty after this call returns.
+ */
+ @Override
+ public void clear() {
+ lock.lock();
+ try {
+ list.clear();
+ notFull.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Iterator iterator() {
+ return list.iterator();
+ }
+
+ public Iterator descendingIterator() {
+ return list.descendingIterator();
+ }
+}
\ No newline at end of file
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 42a3528..5253e5b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -30,7 +30,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.tez.runtime.task.EndReason;
import org.apache.tez.runtime.task.TaskRunner2Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
@@ -39,8 +42,6 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Task executor service provides method for scheduling tasks. Tasks submitted to executor service
@@ -73,10 +74,9 @@
// to wait queue
private final Object waitLock;
private final ListeningExecutorService executorService;
- private final BlockingQueue waitQueue;
+ private final EvictingPriorityBlockingQueue waitQueue;
private final ListeningExecutorService waitQueueExecutorService;
private final Map idToTaskMap;
- private final Map> preemptionMap;
private final BlockingQueue preemptionQueue;
private final boolean enablePreemption;
private final ThreadPoolExecutor threadPoolExecutor;
@@ -84,7 +84,7 @@
public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePreemption) {
this.waitLock = new Object();
- this.waitQueue = new BoundedPriorityBlockingQueue<>(new WaitQueueComparator(), waitQueueSize);
+ this.waitQueue = new EvictingPriorityBlockingQueue<>(new WaitQueueComparator(), waitQueueSize);
this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size
numExecutors, // max pool size
1, TimeUnit.MINUTES,
@@ -92,7 +92,6 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePr
new ThreadFactoryBuilder().setNameFormat(TASK_EXECUTOR_THREAD_NAME_FORMAT).build());
this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
this.idToTaskMap = new ConcurrentHashMap<>();
- this.preemptionMap = new ConcurrentHashMap<>();
this.preemptionQueue = new PriorityBlockingQueue<>(numExecutors,
new PreemptionQueueComparator());
this.enablePreemption = enablePreemption;
@@ -171,7 +170,8 @@ public void onFailure(Throwable t) {
@Override
public void schedule(TaskRunnerCallable task) throws RejectedExecutionException {
- if (waitQueue.offer(task)) {
+ TaskRunnerCallable evictedTask = waitQueue.offer(task);
+ if (evictedTask == null) {
if (isDebugEnabled) {
LOG.debug(task.getRequestId() + " added to wait queue.");
}
@@ -180,7 +180,10 @@ public void schedule(TaskRunnerCallable task) throws RejectedExecutionException
waitLock.notify();
}
} else {
- throw new RejectedExecutionException("Queues are full. Rejecting request.");
+ evictedTask.killTask();
+ if (isInfoEnabled) {
+ LOG.info(task.getRequestId() + " evicted from wait queue because of low priority");
+ }
}
}
@@ -204,7 +207,7 @@ private boolean trySchedule(final TaskRunnerCallable task) {
LOG.debug(task.getRequestId() + " is not finishable and pre-emption is enabled."
+ "Adding it to pre-emption queue.");
}
- addTaskToPreemptionList(task, future);
+ addTaskToPreemptionList(task);
}
numSlotsAvailable.decrementAndGet();
@@ -215,62 +218,20 @@ private boolean trySchedule(final TaskRunnerCallable task) {
if (isTraceEnabled) {
LOG.trace("idToTaskMap: " + idToTaskMap.keySet());
- LOG.trace("preemptionMap: " + preemptionMap.keySet());
LOG.trace("preemptionQueue: " + preemptionQueue);
}
- TaskRunnerCallable pRequest = preemptionQueue.remove();
+ TaskRunnerCallable pRequest = preemptionQueue.peek();
- // if some task completes, it will remove itself from pre-emptions lists make this null.
+ // if some task completes, it will remove itself from pre-emption lists, making this null.
// if it happens bail out and schedule it again as a free slot will be available.
if (pRequest != null) {
if (isDebugEnabled) {
- LOG.debug(pRequest.getRequestId() + " is chosen for pre-emption.");
- }
-
- ListenableFuture> pFuture = preemptionMap.get(pRequest);
-
- // if pFuture is null, then it must have been completed and be removed from preemption map
- if (pFuture != null) {
- if (isDebugEnabled) {
- LOG.debug("Pre-emption invoked for " + pRequest.getRequestId()
- + " by interrupting the thread.");
- }
- pRequest.killTask();
- // TODO. Ideally, should wait for the thread to complete and fall off before assuming the
- // slot is available for the next task.
- removeTaskFromPreemptionList(pRequest, pRequest.getRequestId());
-
- // future is cancelled or completed normally, in which case schedule the new request
- if (pFuture.isDone() && pFuture.isCancelled()) {
- if (isDebugEnabled) {
- LOG.debug(pRequest.getRequestId() + " request preempted by " + task.getRequestId());
- }
- }
+ LOG.debug("Kill task invoked for " + pRequest.getRequestId() + " due to pre-emption");
}
- // try to submit the task from wait queue to executor service. If it gets rejected the
- // task from wait queue will hold on to its position for next try.
- try {
- ListenableFuture future = executorService.submit(task);
- FutureCallback wrappedCallback =
- new InternalCompletionListener(task.getCallback());
- Futures.addCallback(future, wrappedCallback);
- numSlotsAvailable.decrementAndGet();
- scheduled = true;
- if (isDebugEnabled) {
- LOG.debug("Request " + task.getRequestId() + " from wait queue submitted" +
- " to executor service.");
- }
- } catch (RejectedExecutionException e1) {
-
- // This should not happen as we just freed a slot from executor service by pre-emption,
- // which cannot be claimed by other tasks as trySchedule() is serially executed.
- scheduled = false;
- LOG.error("Request " + task.getRequestId() + " from wait queue rejected by" +
- " executor service.");
- }
+ pRequest.killTask();
}
}
}
@@ -281,14 +242,11 @@ private boolean trySchedule(final TaskRunnerCallable task) {
private synchronized void removeTaskFromPreemptionList(TaskRunnerCallable pRequest,
String requestId) {
idToTaskMap.remove(requestId);
- preemptionMap.remove(pRequest);
preemptionQueue.remove(pRequest);
}
- private synchronized void addTaskToPreemptionList(TaskRunnerCallable task,
- ListenableFuture future) {
+ private synchronized void addTaskToPreemptionList(TaskRunnerCallable task) {
idToTaskMap.put(task.getRequestId(), task);
- preemptionMap.put(task, future);
preemptionQueue.add(task);
}
@@ -303,20 +261,20 @@ public InternalCompletionListener(TaskRunnerCallable.TaskRunnerCallback wrappedC
@Override
public void onSuccess(TaskRunner2Result result) {
wrappedCallback.onSuccess(result);
- updatePreemptionListAndNotify(true);
+ updatePreemptionListAndNotify(result.getEndReason());
}
@Override
public void onFailure(Throwable t) {
wrappedCallback.onFailure(t);
- updatePreemptionListAndNotify(false);
+ updatePreemptionListAndNotify(null);
}
- private void updatePreemptionListAndNotify(boolean success) {
+ private void updatePreemptionListAndNotify(EndReason reason) {
// if this task was added to pre-emption list, remove it
String taskId = wrappedCallback.getRequestId();
TaskRunnerCallable task = idToTaskMap.get(taskId);
- String state = success ? "succeeded" : "failed";
+ String state = reason == null ? "FAILED" : reason.name();
if (enablePreemption && task != null) {
removeTaskFromPreemptionList(task, taskId);
if (isDebugEnabled) {
@@ -370,7 +328,7 @@ public void shutDown(boolean awaitTermination) {
@VisibleForTesting
public int getPreemptionListSize() {
- return preemptionMap.size();
+ return preemptionQueue.size();
}
@VisibleForTesting
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 3b89b48..740a2ca 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -18,11 +18,11 @@
package org.apache.hadoop.hive.llap.daemon.impl;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
@@ -39,9 +39,6 @@
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.task.EndReason;
import org.apache.tez.runtime.task.TaskRunner2Result;
-import org.apache.tez.runtime.task.TezChild;
-import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult;
-import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult.ExitStatus;
import org.junit.Before;
import org.junit.Test;
@@ -62,9 +59,13 @@ public MockRequest(LlapDaemonProtocolProtos.SubmitWorkRequestProto requestProto,
}
@Override
- protected TaskRunner2Result callInternal() throws Exception {
+ protected TaskRunner2Result callInternal() {
System.out.println(requestId + " is executing..");
- Thread.sleep(workTime);
+ try {
+ Thread.sleep(workTime);
+ } catch (InterruptedException e) {
+ return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+ }
return new TaskRunner2Result(EndReason.SUCCESS, null, false);
}
@@ -102,62 +103,7 @@ private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism
.setContainerIdString("MockContainer_1").setUser("MockUser")
.setTokenIdentifier("MockToken_1").build();
}
-
-
- @Test(expected = RejectedExecutionException.class)
- public void testThreadPoolRejection() throws InterruptedException {
- TaskExecutorService scheduler = new TaskExecutorService(2, 2, false);
- scheduler.schedule(new MockRequest(createRequest(1, 4), true, 1000));
- Thread.sleep(100);
- scheduler.schedule(new MockRequest(createRequest(2, 4), true, 1000));
- Thread.sleep(100);
- assertEquals(0, scheduler.getPreemptionListSize());
- scheduler.schedule(new MockRequest(createRequest(3, 4), true, 1000));
- Thread.sleep(100);
- scheduler.schedule(new MockRequest(createRequest(4, 4), true, 1000));
- Thread.sleep(100);
- assertEquals(0, scheduler.getPreemptionListSize());
- // this request should be rejected
- scheduler.schedule(new MockRequest(createRequest(5, 8), true, 1000));
- }
-
- @Test
- public void testPreemption() throws InterruptedException {
- TaskExecutorService scheduler = new TaskExecutorService(2, 2, true);
- scheduler.schedule(new MockRequest(createRequest(1, 4), false, 100000));
- Thread.sleep(100);
- scheduler.schedule(new MockRequest(createRequest(2, 4), false, 100000));
- Thread.sleep(100);
- assertEquals(2, scheduler.getPreemptionListSize());
- // these should invoke preemption
- scheduler.schedule(new MockRequest(createRequest(3, 8), true, 1000));
- Thread.sleep(100);
- scheduler.schedule(new MockRequest(createRequest(4, 8), true, 1000));
- Thread.sleep(100);
- assertEquals(0, scheduler.getPreemptionListSize());
- }
-
- @Test
- public void testPreemptionOrder() throws InterruptedException {
- TaskExecutorService scheduler = new TaskExecutorService(2, 2, true);
- MockRequest r1 = new MockRequest(createRequest(1, 4), false, 100000);
- scheduler.schedule(r1);
- Thread.sleep(100);
- MockRequest r2 = new MockRequest(createRequest(2, 4), false, 100000);
- scheduler.schedule(r2);
- Thread.sleep(100);
- assertEquals(r1, scheduler.getPreemptionTask());
- // these should invoke preemption
- scheduler.schedule(new MockRequest(createRequest(3, 8), true, 1000));
- // wait till pre-emption to kick-in and complete
- Thread.sleep(100);
- assertEquals(r2, scheduler.getPreemptionTask());
- scheduler.schedule(new MockRequest(createRequest(4, 8), true, 1000));
- // wait till pre-emption to kick-in and complete
- Thread.sleep(100);
- assertEquals(0, scheduler.getPreemptionListSize());
- }
-
+
@Test
public void testWaitQueueComparator() throws InterruptedException {
MockRequest r1 = new MockRequest(createRequest(1, 2), false, 100000);
@@ -165,17 +111,18 @@ public void testWaitQueueComparator() throws InterruptedException {
MockRequest r3 = new MockRequest(createRequest(3, 6), false, 1000000);
MockRequest r4 = new MockRequest(createRequest(4, 8), false, 1000000);
MockRequest r5 = new MockRequest(createRequest(5, 10), false, 1000000);
- BlockingQueue queue = new BoundedPriorityBlockingQueue(
+ EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue(
new TaskExecutorService.WaitQueueComparator(), 4);
- queue.offer(r1);
+ assertNull(queue.offer(r1));
assertEquals(r1, queue.peek());
- queue.offer(r2);
+ assertNull(queue.offer(r2));
assertEquals(r1, queue.peek());
- queue.offer(r3);
+ assertNull(queue.offer(r3));
assertEquals(r1, queue.peek());
- queue.offer(r4);
+ assertNull(queue.offer(r4));
assertEquals(r1, queue.peek());
- assertEquals(false, queue.offer(r5));
+ // this offer will be rejected
+ assertEquals(r5, queue.offer(r5));
assertEquals(r1, queue.take());
assertEquals(r2, queue.take());
assertEquals(r3, queue.take());
@@ -186,17 +133,18 @@ public void testWaitQueueComparator() throws InterruptedException {
r3 = new MockRequest(createRequest(3, 6), true, 1000000);
r4 = new MockRequest(createRequest(4, 8), true, 1000000);
r5 = new MockRequest(createRequest(5, 10), true, 1000000);
- queue = new BoundedPriorityBlockingQueue(
+ queue = new EvictingPriorityBlockingQueue(
new TaskExecutorService.WaitQueueComparator(), 4);
- queue.offer(r1);
+ assertNull(queue.offer(r1));
assertEquals(r1, queue.peek());
- queue.offer(r2);
+ assertNull(queue.offer(r2));
assertEquals(r1, queue.peek());
- queue.offer(r3);
+ assertNull(queue.offer(r3));
assertEquals(r1, queue.peek());
- queue.offer(r4);
+ assertNull(queue.offer(r4));
assertEquals(r1, queue.peek());
- assertEquals(false, queue.offer(r5));
+ // this offer will be rejected
+ assertEquals(r5, queue.offer(r5));
assertEquals(r1, queue.take());
assertEquals(r2, queue.take());
assertEquals(r3, queue.take());
@@ -207,84 +155,90 @@ public void testWaitQueueComparator() throws InterruptedException {
r3 = new MockRequest(createRequest(3, 1), true, 1000000);
r4 = new MockRequest(createRequest(4, 1), false, 1000000);
r5 = new MockRequest(createRequest(5, 10), true, 1000000);
- queue = new BoundedPriorityBlockingQueue(
+ queue = new EvictingPriorityBlockingQueue(
new TaskExecutorService.WaitQueueComparator(), 4);
- queue.offer(r1);
+ assertNull(queue.offer(r1));
assertEquals(r1, queue.peek());
- queue.offer(r2);
+ assertNull(queue.offer(r2));
assertEquals(r1, queue.peek());
- queue.offer(r3);
- assertEquals(r1, queue.peek());
- queue.offer(r4);
- assertEquals(r1, queue.peek());
- assertEquals(false, queue.offer(r5));
- assertEquals(r1, queue.take());
+ assertNull(queue.offer(r3));
+ // same priority with r1
+ assertEquals(r3, queue.peek());
+ // same priority with r2
+ assertNull(queue.offer(r4));
+ assertEquals(r3, queue.peek());
+ // offer accepted and r2 gets evicted
+ assertEquals(r2, queue.offer(r5));
assertEquals(r3, queue.take());
+ assertEquals(r1, queue.take());
+ assertEquals(r5, queue.take());
assertEquals(r4, queue.take());
- assertEquals(r2, queue.take());
r1 = new MockRequest(createRequest(1, 2), true, 100000);
r2 = new MockRequest(createRequest(2, 4), false, 100000);
r3 = new MockRequest(createRequest(3, 6), true, 1000000);
r4 = new MockRequest(createRequest(4, 8), false, 1000000);
r5 = new MockRequest(createRequest(5, 10), true, 1000000);
- queue = new BoundedPriorityBlockingQueue(
+ queue = new EvictingPriorityBlockingQueue(
new TaskExecutorService.WaitQueueComparator(), 4);
- queue.offer(r1);
+ assertNull(queue.offer(r1));
assertEquals(r1, queue.peek());
- queue.offer(r2);
+ assertNull(queue.offer(r2));
assertEquals(r1, queue.peek());
- queue.offer(r3);
+ assertNull(queue.offer(r3));
assertEquals(r1, queue.peek());
- queue.offer(r4);
+ assertNull(queue.offer(r4));
assertEquals(r1, queue.peek());
- assertEquals(false, queue.offer(r5));
+ // offer accepted and r4 gets evicted
+ assertEquals(r4, queue.offer(r5));
assertEquals(r1, queue.take());
assertEquals(r3, queue.take());
+ assertEquals(r5, queue.take());
assertEquals(r2, queue.take());
- assertEquals(r4, queue.take());
r1 = new MockRequest(createRequest(1, 2), true, 100000);
r2 = new MockRequest(createRequest(2, 4), false, 100000);
r3 = new MockRequest(createRequest(3, 6), false, 1000000);
r4 = new MockRequest(createRequest(4, 8), false, 1000000);
r5 = new MockRequest(createRequest(5, 10), true, 1000000);
- queue = new BoundedPriorityBlockingQueue(
+ queue = new EvictingPriorityBlockingQueue(
new TaskExecutorService.WaitQueueComparator(), 4);
- queue.offer(r1);
+ assertNull(queue.offer(r1));
assertEquals(r1, queue.peek());
- queue.offer(r2);
+ assertNull(queue.offer(r2));
assertEquals(r1, queue.peek());
- queue.offer(r3);
+ assertNull(queue.offer(r3));
assertEquals(r1, queue.peek());
- queue.offer(r4);
+ assertNull(queue.offer(r4));
assertEquals(r1, queue.peek());
- assertEquals(false, queue.offer(r5));
+ // offer accepted and r4 gets evicted
+ assertEquals(r4, queue.offer(r5));
assertEquals(r1, queue.take());
+ assertEquals(r5, queue.take());
assertEquals(r2, queue.take());
assertEquals(r3, queue.take());
- assertEquals(r4, queue.take());
r1 = new MockRequest(createRequest(1, 2), false, 100000);
r2 = new MockRequest(createRequest(2, 4), true, 100000);
r3 = new MockRequest(createRequest(3, 6), true, 1000000);
r4 = new MockRequest(createRequest(4, 8), true, 1000000);
r5 = new MockRequest(createRequest(5, 10), true, 1000000);
- queue = new BoundedPriorityBlockingQueue(
+ queue = new EvictingPriorityBlockingQueue(
new TaskExecutorService.WaitQueueComparator(), 4);
- queue.offer(r1);
+ assertNull(queue.offer(r1));
assertEquals(r1, queue.peek());
- queue.offer(r2);
+ assertNull(queue.offer(r2));
assertEquals(r2, queue.peek());
- queue.offer(r3);
+ assertNull(queue.offer(r3));
assertEquals(r2, queue.peek());
- queue.offer(r4);
+ assertNull(queue.offer(r4));
assertEquals(r2, queue.peek());
- assertEquals(false, queue.offer(r5));
+ // offer accepted, r1 evicted
+ assertEquals(r1, queue.offer(r5));
assertEquals(r2, queue.take());
assertEquals(r3, queue.take());
assertEquals(r4, queue.take());
- assertEquals(r1, queue.take());
+ assertEquals(r5, queue.take());
}
@Test
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
index 0722b9f..755f847 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java
@@ -75,14 +75,14 @@
public class TezJobMonitor {
private static final String CLASS_NAME = TezJobMonitor.class.getName();
- private static final int MIN_TERMINAL_WIDTH = 90;
+ private static final int MIN_TERMINAL_WIDTH = 92;
private static final int COLUMN_1_WIDTH = 16;
private static final int SEPARATOR_WIDTH = MIN_TERMINAL_WIDTH;
// keep this within 80 chars width. If more columns needs to be added then update min terminal
// width requirement and separator width accordingly
- private static final String HEADER_FORMAT = "%16s%10s %11s %5s %9s %7s %7s %6s %6s";
- private static final String VERTEX_FORMAT = "%-16s%10s %11s %5s %9s %7s %7s %6s %6s";
+ private static final String HEADER_FORMAT = "%16s%10s %11s %5s %9s %7s %7s %6s %6s ";
+ private static final String VERTEX_FORMAT = "%-16s%10s %11s %5s %9s %7s %7s %6s %6s ";
private static final String FOOTER_FORMAT = "%-15s %-30s %-4s %-25s";
private static final String HEADER = String.format(HEADER_FORMAT,
"VERTICES", "MODE", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", "KILLED");