diff --git a/LICENSE b/LICENSE index db3777d92a2622f1935b472598c11468b87b4113..d677b7097763e75699400875541e2668a52c0884 100644 --- a/LICENSE +++ b/LICENSE @@ -494,4 +494,36 @@ SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file +POSSIBILITY OF SUCH DAMAGE. + +For org.apache.hadoop.hive.llap.daemon.impl.PriorityBlockingDeque class: + +The BSD 3-Clause License + +Copyright (c) 2007, Aviad Ben Dov + +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this list +of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright notice, this +list of conditions and the following disclaimer in the documentation and/or other +materials provided with the distribution. +3. Neither the name of Infomancers, Ltd. nor the names of its contributors may be +used to endorse or promote products derived from this software without specific +prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/BoundedPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/BoundedPriorityBlockingQueue.java deleted file mode 100644 index 78d3c6cacfdd0dee96477baf6fd9dcdb9f016545..0000000000000000000000000000000000000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/BoundedPriorityBlockingQueue.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.llap.daemon.impl; - -import java.util.Comparator; -import java.util.concurrent.PriorityBlockingQueue; - -/** - * Priority blocking queue of bounded size. The entries that are added already added will be - * ordered based on the specified comparator. If the queue is full, offer() will return false and - * add() will throw IllegalStateException. - */ -public class BoundedPriorityBlockingQueue extends PriorityBlockingQueue { - private int maxSize; - - public BoundedPriorityBlockingQueue(int maxSize) { - this.maxSize = maxSize; - } - - public BoundedPriorityBlockingQueue(Comparator comparator, int maxSize) { - super(maxSize, comparator); - this.maxSize = maxSize; - } - - @Override - public boolean add(E e) { - if (size() >= maxSize) { - throw new IllegalStateException("BoundedPriorityBlockingQueue is full"); - } else { - return super.add(e); - } - } - - @Override - public boolean offer(E e) { - if (size() >= maxSize) { - return false; - } else { - return super.offer(e); - } - } -} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..f0425627760b3b45518379fe60b464c76534a44c --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.util.Comparator; + +/** + * Bounded priority queue that evicts the last element based on priority order specified + * through comparator. Elements that are added to the queue are sorted based on the specified + * comparator. If the queue is full and if a new element is added to it, the new element is compared + * with the last element so as to claim a spot. The evicted element (or the added item) is then + * returned back. If the queue is not full, new element will be added to queue and null is returned. + */ +public class EvictingPriorityBlockingQueue { + private PriorityBlockingDeque deque; + private Comparator comparator; + + public EvictingPriorityBlockingQueue(Comparator comparator, int maxSize) { + this.deque = new PriorityBlockingDeque<>(comparator, maxSize); + this.comparator = comparator; + } + + public synchronized E offer(E e) { + if (deque.offer(e)) { + return null; + } else { + E last = deque.peekLast(); + if (comparator.compare(e, last) < 0) { + deque.removeLast(); + deque.offer(e); + return last; + } + return e; + } + } + + public boolean isEmpty() { + return deque.isEmpty(); + } + + public E peek() { + return deque.peek(); + } + + public E take() throws InterruptedException { + return deque.take(); + } + + public void remove(E e) { + deque.remove(e); + } +} diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java new file mode 100644 index 0000000000000000000000000000000000000000..db2ab161a608eedab4aa392a8d003a370c9bfe1f --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java @@ -0,0 +1,767 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.util.*; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on + * a navigable set. + *

+ *

The optional capacity bound constructor argument serves as a + * way to prevent excessive expansion. The capacity, if unspecified, + * is equal to {@link Integer#MAX_VALUE}. + *

+ *

This class and its iterator implement all of the + * optional methods of the {@link Collection} and {@link + * Iterator} interfaces. + *

+ * This code is loosely based on the {@linkplain java.util.concurrent.LinkedBlockingDeque linked blocking deque} code. + */ +public class PriorityBlockingDeque + extends AbstractQueue + implements BlockingDeque, java.io.Serializable { + + /* + * Implemented as a navigable set protected by a + * single lock and using conditions to manage blocking. + */ + + private final int capacity; + + private final LinkedList list; + /** + * Main lock guarding all access + */ + private final ReentrantLock lock = new ReentrantLock(); + /** + * Condition for waiting takes + */ + private final Condition notEmpty = lock.newCondition(); + /** + * Condition for waiting puts + */ + private final Condition notFull = lock.newCondition(); + private Comparator comparator; + + /** + * Creates a PriorityBlockingDeque with a capacity of + * {@link Integer#MAX_VALUE}. + */ + public PriorityBlockingDeque() { + this(null, Integer.MAX_VALUE); + } + + /** + * Creates a PriorityBlockingDeque with the given (fixed) capacity. + * + * @param capacity the capacity of this deque + * @throws IllegalArgumentException if capacity is less than 1 + */ + public PriorityBlockingDeque(int capacity) { + this(null, capacity); + } + + public PriorityBlockingDeque(Comparator comparator, int capacity) { + if (capacity <= 0) throw new IllegalArgumentException(); + this.capacity = capacity; + this.list = new LinkedList(); + this.comparator = comparator; + } + + // Basic adding and removing operations, called only while holding lock + + /** + * Adds e or returns false if full. + * + * @param e The element to add. + * @return Whether adding was successful. + */ + private boolean innerAdd(E e) { + if (list.size() >= capacity) + return false; + + int insertionPoint = Collections.binarySearch(list, e, comparator); + if (insertionPoint < 0) { + // this means the key didn't exist, so the insertion point is negative minus 1. + insertionPoint = -insertionPoint - 1; + } + + list.add(insertionPoint, e); + // Collections.sort(list, comparator); + notEmpty.signal(); + + return true; + } + + /** + * Removes and returns first element, or null if empty. + * + * @return The removed element. + */ + private E innerRemoveFirst() { + E f = list.pollFirst(); + if (f == null) + return null; + + + notFull.signal(); + return f; + } + + /** + * Removes and returns last element, or null if empty. + * + * @return The removed element. + */ + private E innerRemoveLast() { + E l = list.pollLast(); + if (l == null) + return null; + + notFull.signal(); + return l; + } + + // BlockingDeque methods + + /** + * @throws IllegalStateException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + public void addFirst(E e) { + if (!offerFirst(e)) + throw new IllegalStateException("Deque full"); + } + + /** + * @throws IllegalStateException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + public void addLast(E e) { + if (!offerLast(e)) + throw new IllegalStateException("Deque full"); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + public boolean offerFirst(E e) { + if (e == null) throw new NullPointerException(); + lock.lock(); + try { + return innerAdd(e); + } finally { + lock.unlock(); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + public boolean offerLast(E e) { + if (e == null) throw new NullPointerException(); + lock.lock(); + try { + return innerAdd(e); + } finally { + lock.unlock(); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + * @throws InterruptedException {@inheritDoc} + */ + public void putFirst(E e) throws InterruptedException { + if (e == null) throw new NullPointerException(); + lock.lock(); + try { + while (!innerAdd(e)) + notFull.await(); + } finally { + lock.unlock(); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + * @throws InterruptedException {@inheritDoc} + */ + public void putLast(E e) throws InterruptedException { + if (e == null) throw new NullPointerException(); + lock.lock(); + try { + while (!innerAdd(e)) + notFull.await(); + } finally { + lock.unlock(); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + * @throws InterruptedException {@inheritDoc} + */ + public boolean offerFirst(E e, long timeout, TimeUnit unit) + throws InterruptedException { + if (e == null) throw new NullPointerException(); + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + for (; ;) { + if (innerAdd(e)) + return true; + if (nanos <= 0) + return false; + nanos = notFull.awaitNanos(nanos); + } + } finally { + lock.unlock(); + } + } + + /** + * @throws NullPointerException {@inheritDoc} + * @throws InterruptedException {@inheritDoc} + */ + public boolean offerLast(E e, long timeout, TimeUnit unit) + throws InterruptedException { + if (e == null) throw new NullPointerException(); + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + for (; ;) { + if (innerAdd(e)) + return true; + if (nanos <= 0) + return false; + nanos = notFull.awaitNanos(nanos); + } + } finally { + lock.unlock(); + } + } + + /** + * @throws NoSuchElementException {@inheritDoc} + */ + public E removeFirst() { + E x = pollFirst(); + if (x == null) throw new NoSuchElementException(); + return x; + } + + /** + * @throws NoSuchElementException {@inheritDoc} + */ + public E removeLast() { + E x = pollLast(); + if (x == null) throw new NoSuchElementException(); + return x; + } + + public E pollFirst() { + lock.lock(); + try { + return innerRemoveFirst(); + } finally { + lock.unlock(); + } + } + + public E pollLast() { + lock.lock(); + try { + return innerRemoveLast(); + } finally { + lock.unlock(); + } + } + + public E takeFirst() throws InterruptedException { + lock.lock(); + try { + E x; + while ((x = innerRemoveFirst()) == null) + notEmpty.await(); + return x; + } finally { + lock.unlock(); + } + } + + public E takeLast() throws InterruptedException { + lock.lock(); + try { + E x; + while ((x = innerRemoveLast()) == null) + notEmpty.await(); + return x; + } finally { + lock.unlock(); + } + } + + public E pollFirst(long timeout, TimeUnit unit) + throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + for (; ;) { + E x = innerRemoveFirst(); + if (x != null) + return x; + if (nanos <= 0) + return null; + nanos = notEmpty.awaitNanos(nanos); + } + } finally { + lock.unlock(); + } + } + + public E pollLast(long timeout, TimeUnit unit) + throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lockInterruptibly(); + try { + for (; ;) { + E x = innerRemoveLast(); + if (x != null) + return x; + if (nanos <= 0) + return null; + nanos = notEmpty.awaitNanos(nanos); + } + } finally { + lock.unlock(); + } + } + + /** + * @throws NoSuchElementException {@inheritDoc} + */ + public E getFirst() { + E x = peekFirst(); + if (x == null) throw new NoSuchElementException(); + return x; + } + + /** + * @throws NoSuchElementException {@inheritDoc} + */ + public E getLast() { + E x = peekLast(); + if (x == null) throw new NoSuchElementException(); + return x; + } + + public E peekFirst() { + lock.lock(); + try { + return list.size() == 0 ? null : list.peekFirst(); + } finally { + lock.unlock(); + } + } + + public E peekLast() { + lock.lock(); + try { + return list.size() == 0 ? null : list.peekLast(); + } finally { + lock.unlock(); + } + } + + public boolean removeFirstOccurrence(Object o) { + if (o == null) return false; + lock.lock(); + try { + for (Iterator it = list.iterator(); it.hasNext();) { + E e = it.next(); + if (o.equals(e)) { + it.remove(); + return true; + } + } + return false; + } finally { + lock.unlock(); + } + } + + public boolean removeLastOccurrence(Object o) { + if (o == null) return false; + lock.lock(); + try { + for (Iterator it = list.descendingIterator(); it.hasNext();) { + E e = it.next(); + if (o.equals(e)) { + it.remove(); + return true; + } + } + return false; + } finally { + lock.unlock(); + } + } + + // BlockingQueue methods + + /** + * Inserts the specified element to the deque unless it would + * violate capacity restrictions. When using a capacity-restricted deque, + * it is generally preferable to use method {@link #offer(Object) offer}. + *

+ *

This method is equivalent to {@link #addLast}. + * + * @throws IllegalStateException if the element cannot be added at this + * time due to capacity restrictions + * @throws NullPointerException if the specified element is null + */ + @Override + public boolean add(E e) { + addLast(e); + return true; + } + + /** + * @throws NullPointerException if the specified element is null + */ + public boolean offer(E e) { + return offerLast(e); + } + + /** + * @throws NullPointerException {@inheritDoc} + * @throws InterruptedException {@inheritDoc} + */ + public void put(E e) throws InterruptedException { + putLast(e); + } + + /** + * @throws NullPointerException {@inheritDoc} + * @throws InterruptedException {@inheritDoc} + */ + public boolean offer(E e, long timeout, TimeUnit unit) + throws InterruptedException { + return offerLast(e, timeout, unit); + } + + /** + * Retrieves and removes the head of the queue represented by this deque. + * This method differs from {@link #poll poll} only in that it throws an + * exception if this deque is empty. + *

+ *

This method is equivalent to {@link #removeFirst() removeFirst}. + * + * @return the head of the queue represented by this deque + * @throws NoSuchElementException if this deque is empty + */ + @Override + public E remove() { + return removeFirst(); + } + + public E poll() { + return pollFirst(); + } + + public E take() throws InterruptedException { + return takeFirst(); + } + + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + return pollFirst(timeout, unit); + } + + /** + * Retrieves, but does not remove, the head of the queue represented by + * this deque. This method differs from {@link #peek peek} only in that + * it throws an exception if this deque is empty. + *

+ *

This method is equivalent to {@link #getFirst() getFirst}. + * + * @return the head of the queue represented by this deque + * @throws NoSuchElementException if this deque is empty + */ + @Override + public E element() { + return getFirst(); + } + + public E peek() { + return peekFirst(); + } + + /** + * Returns the number of additional elements that this deque can ideally + * (in the absence of memory or resource constraints) accept without + * blocking. This is always equal to the initial capacity of this deque + * less the current size of this deque. + *

+ *

Note that you cannot always tell if an attempt to insert + * an element will succeed by inspecting remainingCapacity + * because it may be the case that another thread is about to + * insert or remove an element. + */ + public int remainingCapacity() { + lock.lock(); + try { + return capacity - list.size(); + } finally { + lock.unlock(); + } + } + + /** + * @throws UnsupportedOperationException {@inheritDoc} + * @throws ClassCastException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + lock.lock(); + try { + for (E e : list) { + c.add(e); + } + int n = list.size(); + list.clear(); + notFull.signalAll(); + return n; + } finally { + lock.unlock(); + } + } + + /** + * @throws UnsupportedOperationException {@inheritDoc} + * @throws ClassCastException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + * @throws IllegalArgumentException {@inheritDoc} + */ + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + lock.lock(); + try { + int n = 0; + for (Iterator it = list.iterator(); n < maxElements && it.hasNext();) { + E e = it.next(); + c.add(e); + it.remove(); + ++n; + } + + notFull.signalAll(); + return n; + } finally { + lock.unlock(); + } + } + + // Stack methods + + /** + * @throws IllegalStateException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ + public void push(E e) { + addFirst(e); + } + + /** + * @throws NoSuchElementException {@inheritDoc} + */ + public E pop() { + return removeFirst(); + } + + // Collection methods + + /** + * Removes the first occurrence of the specified element from this deque. + * If the deque does not contain the element, it is unchanged. + * More formally, removes the first element e such that + * o.equals(e) (if such an element exists). + * Returns true if this deque contained the specified element + * (or equivalently, if this deque changed as a result of the call). + *

+ *

This method is equivalent to + * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}. + * + * @param o element to be removed from this deque, if present + * @return true if this deque changed as a result of the call + */ + @Override + public boolean remove(Object o) { + return removeFirstOccurrence(o); + } + + /** + * Returns the number of elements in this deque. + * + * @return the number of elements in this deque + */ + @Override + public int size() { + lock.lock(); + try { + return list.size(); + } finally { + lock.unlock(); + } + } + + /** + * Returns true if this deque contains the specified element. + * More formally, returns true if and only if this deque contains + * at least one element e such that o.equals(e). + * + * @param o object to be checked for containment in this deque + * @return true if this deque contains the specified element + */ + @Override + public boolean contains(Object o) { + if (o == null) return false; + lock.lock(); + try { + return list.contains(o); + } finally { + lock.unlock(); + } + } + + /** + * Returns an array containing all of the elements in this deque, in + * proper sequence (from first to last element). + *

+ *

The returned array will be "safe" in that no references to it are + * maintained by this deque. (In other words, this method must allocate + * a new array). The caller is thus free to modify the returned array. + *

+ *

This method acts as bridge between array-based and collection-based + * APIs. + * + * @return an array containing all of the elements in this deque + */ + @Override + public Object[] toArray() { + lock.lock(); + try { + return list.toArray(); + } finally { + lock.unlock(); + } + } + + /** + * Returns an array containing all of the elements in this deque, in + * proper sequence; the runtime type of the returned array is that of + * the specified array. If the deque fits in the specified array, it + * is returned therein. Otherwise, a new array is allocated with the + * runtime type of the specified array and the size of this deque. + *

+ *

If this deque fits in the specified array with room to spare + * (i.e., the array has more elements than this deque), the element in + * the array immediately following the end of the deque is set to + * null. + *

+ *

Like the {@link #toArray()} method, this method acts as bridge between + * array-based and collection-based APIs. Further, this method allows + * precise control over the runtime type of the output array, and may, + * under certain circumstances, be used to save allocation costs. + *

+ *

Suppose x is a deque known to contain only strings. + * The following code can be used to dump the deque into a newly + * allocated array of String: + *

+ *

+   *     String[] y = x.toArray(new String[0]);
+ *

+ * Note that toArray(new Object[0]) is identical in function to + * toArray(). + * + * @param a the array into which the elements of the deque are to + * be stored, if it is big enough; otherwise, a new array of the + * same runtime type is allocated for this purpose + * @return an array containing all of the elements in this deque + * @throws ArrayStoreException if the runtime type of the specified array + * is not a supertype of the runtime type of every element in + * this deque + * @throws NullPointerException if the specified array is null + */ + @Override + public T[] toArray(T[] a) { + lock.lock(); + try { + return list.toArray(a); + } finally { + lock.unlock(); + } + } + + @Override + public String toString() { + lock.lock(); + try { + return super.toString(); + } finally { + lock.unlock(); + } + } + + /** + * Atomically removes all of the elements from this deque. + * The deque will be empty after this call returns. + */ + @Override + public void clear() { + lock.lock(); + try { + list.clear(); + notFull.signalAll(); + } finally { + lock.unlock(); + } + } + + @Override + public Iterator iterator() { + return list.iterator(); + } + + public Iterator descendingIterator() { + return list.descendingIterator(); + } +} \ No newline at end of file diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index ed8df9543451b3a30fe95b4f1a60d0378c0da631..42d525463135b462e69b49a8faba4053355bf1a8 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -72,7 +72,7 @@ // to wait queue private final Object waitLock; private final ListeningExecutorService executorService; - private final BlockingQueue waitQueue; + private final EvictingPriorityBlockingQueue waitQueue; private final ListeningExecutorService waitQueueExecutorService; private final Map idToTaskMap; private final Map> preemptionMap; @@ -83,7 +83,7 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePreemption) { this.waitLock = new Object(); - this.waitQueue = new BoundedPriorityBlockingQueue<>(new WaitQueueComparator(), waitQueueSize); + this.waitQueue = new EvictingPriorityBlockingQueue<>(new WaitQueueComparator(), waitQueueSize); this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size numExecutors, // max pool size 1, TimeUnit.MINUTES, @@ -170,7 +170,8 @@ public void onFailure(Throwable t) { @Override public void schedule(TaskRunnerCallable task) throws RejectedExecutionException { - if (waitQueue.offer(task)) { + TaskRunnerCallable evictedTask = waitQueue.offer(task); + if (evictedTask == null) { if (isDebugEnabled) { LOG.debug(task.getRequestId() + " added to wait queue."); } @@ -179,6 +180,9 @@ public void schedule(TaskRunnerCallable task) throws RejectedExecutionException waitLock.notify(); } } else { + // TODO: Report to AM as taskFailure/taskKilled + notifyAM(evictedTask); + // TODO: remove this after implementing notifyAM throw new RejectedExecutionException("Queues are full. Rejecting request."); } } diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 44a4633eb89feeca403b8d98d80b78e72cf739eb..7cea6d8ce5182e087f613e38babd9d2e7504b474 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.llap.daemon.impl; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -161,17 +162,18 @@ public void testWaitQueueComparator() throws InterruptedException { MockRequest r3 = new MockRequest(createRequest(3, 6), false, 1000000); MockRequest r4 = new MockRequest(createRequest(4, 8), false, 1000000); MockRequest r5 = new MockRequest(createRequest(5, 10), false, 1000000); - BlockingQueue queue = new BoundedPriorityBlockingQueue( + EvictingPriorityBlockingQueue queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); - queue.offer(r1); + assertNull(queue.offer(r1)); assertEquals(r1, queue.peek()); - queue.offer(r2); + assertNull(queue.offer(r2)); assertEquals(r1, queue.peek()); - queue.offer(r3); + assertNull(queue.offer(r3)); assertEquals(r1, queue.peek()); - queue.offer(r4); + assertNull(queue.offer(r4)); assertEquals(r1, queue.peek()); - assertEquals(false, queue.offer(r5)); + // this offer will be rejected + assertEquals(r5, queue.offer(r5)); assertEquals(r1, queue.take()); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); @@ -182,17 +184,18 @@ public void testWaitQueueComparator() throws InterruptedException { r3 = new MockRequest(createRequest(3, 6), true, 1000000); r4 = new MockRequest(createRequest(4, 8), true, 1000000); r5 = new MockRequest(createRequest(5, 10), true, 1000000); - queue = new BoundedPriorityBlockingQueue( + queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); - queue.offer(r1); + assertNull(queue.offer(r1)); assertEquals(r1, queue.peek()); - queue.offer(r2); + assertNull(queue.offer(r2)); assertEquals(r1, queue.peek()); - queue.offer(r3); + assertNull(queue.offer(r3)); assertEquals(r1, queue.peek()); - queue.offer(r4); + assertNull(queue.offer(r4)); assertEquals(r1, queue.peek()); - assertEquals(false, queue.offer(r5)); + // this offer will be rejected + assertEquals(r5, queue.offer(r5)); assertEquals(r1, queue.take()); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); @@ -203,84 +206,90 @@ public void testWaitQueueComparator() throws InterruptedException { r3 = new MockRequest(createRequest(3, 1), true, 1000000); r4 = new MockRequest(createRequest(4, 1), false, 1000000); r5 = new MockRequest(createRequest(5, 10), true, 1000000); - queue = new BoundedPriorityBlockingQueue( + queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); - queue.offer(r1); - assertEquals(r1, queue.peek()); - queue.offer(r2); - assertEquals(r1, queue.peek()); - queue.offer(r3); + assertNull(queue.offer(r1)); assertEquals(r1, queue.peek()); - queue.offer(r4); + assertNull(queue.offer(r2)); assertEquals(r1, queue.peek()); - assertEquals(false, queue.offer(r5)); - assertEquals(r1, queue.take()); + assertNull(queue.offer(r3)); + // same priority with r1 + assertEquals(r3, queue.peek()); + // same priority with r2 + assertNull(queue.offer(r4)); + assertEquals(r3, queue.peek()); + // offer accepted and r2 gets evicted + assertEquals(r2, queue.offer(r5)); assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + assertEquals(r5, queue.take()); assertEquals(r4, queue.take()); - assertEquals(r2, queue.take()); r1 = new MockRequest(createRequest(1, 2), true, 100000); r2 = new MockRequest(createRequest(2, 4), false, 100000); r3 = new MockRequest(createRequest(3, 6), true, 1000000); r4 = new MockRequest(createRequest(4, 8), false, 1000000); r5 = new MockRequest(createRequest(5, 10), true, 1000000); - queue = new BoundedPriorityBlockingQueue( + queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); - queue.offer(r1); + assertNull(queue.offer(r1)); assertEquals(r1, queue.peek()); - queue.offer(r2); + assertNull(queue.offer(r2)); assertEquals(r1, queue.peek()); - queue.offer(r3); + assertNull(queue.offer(r3)); assertEquals(r1, queue.peek()); - queue.offer(r4); + assertNull(queue.offer(r4)); assertEquals(r1, queue.peek()); - assertEquals(false, queue.offer(r5)); + // offer accepted and r4 gets evicted + assertEquals(r4, queue.offer(r5)); assertEquals(r1, queue.take()); assertEquals(r3, queue.take()); + assertEquals(r5, queue.take()); assertEquals(r2, queue.take()); - assertEquals(r4, queue.take()); r1 = new MockRequest(createRequest(1, 2), true, 100000); r2 = new MockRequest(createRequest(2, 4), false, 100000); r3 = new MockRequest(createRequest(3, 6), false, 1000000); r4 = new MockRequest(createRequest(4, 8), false, 1000000); r5 = new MockRequest(createRequest(5, 10), true, 1000000); - queue = new BoundedPriorityBlockingQueue( + queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); - queue.offer(r1); + assertNull(queue.offer(r1)); assertEquals(r1, queue.peek()); - queue.offer(r2); + assertNull(queue.offer(r2)); assertEquals(r1, queue.peek()); - queue.offer(r3); + assertNull(queue.offer(r3)); assertEquals(r1, queue.peek()); - queue.offer(r4); + assertNull(queue.offer(r4)); assertEquals(r1, queue.peek()); - assertEquals(false, queue.offer(r5)); + // offer accepted and r4 gets evicted + assertEquals(r4, queue.offer(r5)); assertEquals(r1, queue.take()); + assertEquals(r5, queue.take()); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); - assertEquals(r4, queue.take()); r1 = new MockRequest(createRequest(1, 2), false, 100000); r2 = new MockRequest(createRequest(2, 4), true, 100000); r3 = new MockRequest(createRequest(3, 6), true, 1000000); r4 = new MockRequest(createRequest(4, 8), true, 1000000); r5 = new MockRequest(createRequest(5, 10), true, 1000000); - queue = new BoundedPriorityBlockingQueue( + queue = new EvictingPriorityBlockingQueue( new TaskExecutorService.WaitQueueComparator(), 4); - queue.offer(r1); + assertNull(queue.offer(r1)); assertEquals(r1, queue.peek()); - queue.offer(r2); + assertNull(queue.offer(r2)); assertEquals(r2, queue.peek()); - queue.offer(r3); + assertNull(queue.offer(r3)); assertEquals(r2, queue.peek()); - queue.offer(r4); + assertNull(queue.offer(r4)); assertEquals(r2, queue.peek()); - assertEquals(false, queue.offer(r5)); + // offer accepted, r1 evicted + assertEquals(r1, queue.offer(r5)); assertEquals(r2, queue.take()); assertEquals(r3, queue.take()); assertEquals(r4, queue.take()); - assertEquals(r1, queue.take()); + assertEquals(r5, queue.take()); } @Test