Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java (revision 0) @@ -0,0 +1,626 @@ +/* +* Copyright 2010 The Apache Software Foundation +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +package org.apache.hadoop.hbase.regionserver; + +import junit.framework.TestCase; +import java.io.IOException; +import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** +* Test class for the priority compaction queue +*/ +public class TestPriorityCompactionQueue extends TestCase { + static final Log LOG = LogFactory.getLog(TestPriorityCompactionQueue.class); + + @Override + public void setUp() { + } + + @Override + public void tearDown() { + + } + + class DummyHRegion extends HRegion { + String name; + DummyHRegion(String name) { + super(); + this.name = name; + } + + public int hashCode() { + return name.hashCode(); + } + + public boolean equals(DummyHRegion r) { + return name.equals(r.name); + } + + public String toString() { + return "[DummyHRegion "+name+"]"; + } + + public String getRegionNameAsString() { + return name; + } + } + + protected void getAndCheckRegion(PriorityCompactionQueue pq, HRegion checkRegion) + { + HRegion r = pq.remove(); + if (r != checkRegion) { + assertTrue("Didn't get expected "+checkRegion+" got "+r, r.equals(checkRegion)); + } + } + + protected void addRegion(PriorityCompactionQueue pq, HRegion r, Priority p) + { + pq.add(r, p); + try + { + //Sleep 10 millisecond so 2 things are not put in the queue within the + //same millisecond. The queue breaks ties arbitrarily between two + //requests inserted at the same time. We want the ordering to + //be consistent for our unit test. + Thread.sleep(1); + } + catch (InterruptedException ex) + { + //continue + } + } + + ////////////////////////////////////////////////////////////////////////////// + // tests + ////////////////////////////////////////////////////////////////////////////// + + /** tests general functionality of the compaction queue */ + public void testPriorityQueue() throws InterruptedException { + PriorityCompactionQueue pq = new PriorityCompactionQueue(); + + HRegion r1 = new DummyHRegion("r1"); + HRegion r2 = new DummyHRegion("r2"); + HRegion r3 = new DummyHRegion("r3"); + HRegion r4 = new DummyHRegion("r4"); + HRegion r5 = new DummyHRegion("r5"); + + //test 1 + //check fifo w/priority + addRegion(pq, r1, Priority.HIGH_BLOCKING); + addRegion(pq, r2, Priority.HIGH_BLOCKING); + addRegion(pq, r3, Priority.HIGH_BLOCKING); + addRegion(pq, r4, Priority.HIGH_BLOCKING); + addRegion(pq, r5, Priority.HIGH_BLOCKING); + + getAndCheckRegion(pq, r1); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r4); + getAndCheckRegion(pq, r5); + + //test 2 + //check fifo + addRegion(pq, r1, null); + addRegion(pq, r2, null); + addRegion(pq, r3, null); + addRegion(pq, r4, null); + addRegion(pq, r5, null); + + getAndCheckRegion(pq, r1); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r4); + getAndCheckRegion(pq, r5); + + //test 3 + //check fifo w/mixed priority + addRegion(pq, r1, Priority.HIGH_BLOCKING); + addRegion(pq, r2, Priority.NORMAL); + addRegion(pq, r3, Priority.HIGH_BLOCKING); + addRegion(pq, r4, Priority.NORMAL); + addRegion(pq, r5, Priority.HIGH_BLOCKING); + + getAndCheckRegion(pq, r1); + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r5); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r4); + + //test 4 + //check fifo w/mixed priority + addRegion(pq, r1, Priority.NORMAL); + addRegion(pq, r2, Priority.NORMAL); + addRegion(pq, r3, Priority.NORMAL); + addRegion(pq, r4, Priority.NORMAL); + addRegion(pq, r5, Priority.HIGH_BLOCKING); + + getAndCheckRegion(pq, r5); + getAndCheckRegion(pq, r1); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r4); + + //test 5 + //check fifo w/mixed priority elevation time + addRegion(pq, r1, Priority.NORMAL); + addRegion(pq, r2, Priority.HIGH_BLOCKING); + addRegion(pq, r3, Priority.NORMAL); + Thread.sleep(1000); + addRegion(pq, r4, Priority.NORMAL); + addRegion(pq, r5, Priority.HIGH_BLOCKING); + + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r5); + getAndCheckRegion(pq, r1); + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r4); + + //reset the priority compaction queue back to a normal queue + pq = new PriorityCompactionQueue(); + + //test 7 + //test that lower priority are removed from the queue when a high priority + //is added + addRegion(pq, r1, Priority.NORMAL); + addRegion(pq, r2, Priority.NORMAL); + addRegion(pq, r3, Priority.NORMAL); + addRegion(pq, r4, Priority.NORMAL); + addRegion(pq, r5, Priority.NORMAL); + addRegion(pq, r3, Priority.HIGH_BLOCKING); + + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r1); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r4); + getAndCheckRegion(pq, r5); + + assertTrue("Queue should be empty.", pq.size() == 0); + + //test 8 + //don't add the same region more than once + addRegion(pq, r1, Priority.NORMAL); + addRegion(pq, r2, Priority.NORMAL); + addRegion(pq, r3, Priority.NORMAL); + addRegion(pq, r4, Priority.NORMAL); + addRegion(pq, r5, Priority.NORMAL); + addRegion(pq, r1, Priority.NORMAL); + addRegion(pq, r2, Priority.NORMAL); + addRegion(pq, r3, Priority.NORMAL); + addRegion(pq, r4, Priority.NORMAL); + addRegion(pq, r5, Priority.NORMAL); + + getAndCheckRegion(pq, r1); + getAndCheckRegion(pq, r2); + getAndCheckRegion(pq, r3); + getAndCheckRegion(pq, r4); + getAndCheckRegion(pq, r5); + + assertTrue("Queue should be empty.", pq.size() == 0); + } +} +\ No newline at end of file +Index: src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java +=================================================================== +--- src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java (revision 0) +@@ -%ld,%ld +%ld,%ld @@ +/** +* Copyright 2008 The Apache Software Foundation +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority; + +/** This class delegates to the BlockingQueue but wraps all HRegions in +* compaction requests that hold the priority and the date requested. +* +* Implementation Note: With an elevation time of -1 there is the potential for +* starvation of the lower priority compaction requests as long as there is a +* constant stream of high priority requests. +*/ +public class PriorityCompactionQueue implements BlockingQueue { + + //////////////////////////////////////////////////////////////////////////// + // Utility Classes + + /** This class represents a compaction request and holds the region, priority, + * and time submitted. + */ + private class CompactionRequest implements Comparable { + + private HRegion r; + private Priority p; + private Date date; + + public CompactionRequest(HRegion r) { + this(r, null, null); + } + + public CompactionRequest(HRegion r, Priority p) { + this(r, p, null); + } + + public CompactionRequest(HRegion r, Priority p, Date d) { + if (r == null) { + throw new NullPointerException("HRegion cannot be null"); + } + + if (p == null) { + p = Priority.NORMAL; //the default priority + } + + if (d == null) { + d = new Date(); + } + + this.r = r; + this.p = p; + this.date = d; + } + + /** This function will define where in the priority queue a the request will + * end up. Those with the highest priorities will be first. When the + * priorities are the same it will It will first compare priority then date to + * maintain a FIFO functionality. + * + * Note: The date is only accurate to the millisecond which means it is + * possible that two requests were inserted into the queue within a + * millisecond. When that is the case this function will break the tie + * arbitrarily. + */ + @Override + public int compareTo(CompactionRequest request) { + //NOTE: The head of the priority queue is the least element + + if (this.equals(request)) { + return 0; //they are the same request + } + int compareVal; + + compareVal = p.compareTo(request.p); //compare priority + if (compareVal != 0) { + return compareVal; + } + + compareVal = date.compareTo(request.date); + if (compareVal != 0) { + return compareVal; + } + + //break the tie arbitrarily + return -1; + } + + /** Gets the HRegion for the request */ + HRegion getHRegion() { + return r; + } + + /** Gets the priority for the request */ + Priority getPriority() { + return p; + } + + public String toString() { + return "[CompactionRequest: " + r.getRegionNameAsString() + ", " + p + ", " + date + "]"; + } + } + + //////////////////////////////////////////////////////////////////////////// + // Class Definition and Methods + + static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class); + + /** The actual blocking queue we delegate to */ + protected BlockingQueue queue = + new PriorityBlockingQueue(); + /** Hash map of the HRegions contained within the Compaction Queue */ + private final HashMap regionsInQueue = + new HashMap(); + + /** Creates a new PriorityCompactionQueue with no priority elevation time */ + public PriorityCompactionQueue() { + LOG.debug("Create PriorityCompactionQueue"); + } + + /** If the region is not already in the queue it will add it and return a + * new compaction request object. If it is already present in the queue + * then it will return null. + * @param p If null it will use the default priority + * @return returns a compaction request if it isn't already in the queue + */ + protected CompactionRequest addToRegionsInQueue(HRegion r, Priority p) { + CompactionRequest queuedRequest = null; + CompactionRequest newRequest = new CompactionRequest(r, p); + synchronized (regionsInQueue) { + queuedRequest = regionsInQueue.get(r); + if (queuedRequest == null || newRequest.getPriority().compareTo(queuedRequest.getPriority()) < 0) { + LOG.trace("Inserting region in queue. "+newRequest); + regionsInQueue.put(r, newRequest); + } else { + LOG.trace("Region already in queue. queued: "+queuedRequest+", requested: "+newRequest); + newRequest = null; //it is already present so don't add it + } + } + + if (newRequest != null && queuedRequest != null) { + //remove the lower priority request + queue.remove(queuedRequest); + } + + return newRequest; + } + + /** Removes the request from the regions in queue + * @param p If null it will use the default priority + */ + protected CompactionRequest removeFromRegionsInQueue(HRegion r) { + if (r == null) + return null; + + synchronized (regionsInQueue) { + CompactionRequest cr = regionsInQueue.remove(r); + if (cr == null) { + LOG.warn("Removed a region it couldn't find in regionsInQueue: "+r); + } + return cr; + } + } + + public boolean add(HRegion e, Priority p) { + CompactionRequest request = this.addToRegionsInQueue(e, p); + if (request != null) { + boolean result = queue.add(request); + CompactionRequest cr = queue.peek(); + return result; + } else { + return false; + } + } + + @Override + public boolean add(HRegion e) { + return add(e, null); + } + + public boolean offer(HRegion e, Priority p) { + CompactionRequest request = this.addToRegionsInQueue(e, p); + if (request != null) { + return queue.offer(request); + } else { + return false; + } + } + + @Override + public boolean offer(HRegion e) { + return offer(e, null); + } + + public void put(HRegion e, Priority p) throws InterruptedException { + CompactionRequest request = this.addToRegionsInQueue(e, p); + if (request != null) { + queue.put(request); + } + } + + @Override + public void put(HRegion e) throws InterruptedException { + put(e, null); + } + + public boolean offer(HRegion e, Priority p, long timeout, TimeUnit unit) throws InterruptedException { + CompactionRequest request = this.addToRegionsInQueue(e, p); + if (request != null) { + return queue.offer(request, timeout, unit); + } else { + return false; + } + } + + @Override + public boolean offer(HRegion e, long timeout, TimeUnit unit) throws InterruptedException { + return offer(e, null, timeout, unit); + } + + @Override + public HRegion take() throws InterruptedException { + CompactionRequest cr = queue.take(); + if (cr != null) { + removeFromRegionsInQueue(cr.getHRegion()); + return cr.getHRegion(); + } else { + return null; + } + } + + @Override + public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException { + CompactionRequest cr = queue.poll(timeout, unit); + if (cr != null) { + removeFromRegionsInQueue(cr.getHRegion()); + return cr.getHRegion(); + } else { + return null; + } + } + + @Override + public boolean remove(Object r) { + if (r instanceof HRegion) { + CompactionRequest cr = removeFromRegionsInQueue((HRegion) r); + if (cr != null) { + return queue.remove(cr); + } + } + + return false; + } + + @Override + public HRegion remove() { + CompactionRequest cr = queue.remove(); + if (cr != null) { + removeFromRegionsInQueue(cr.getHRegion()); + return cr.getHRegion(); + } else { + return null; + } + } + + @Override + public HRegion poll() { + CompactionRequest cr = queue.poll(); + if (cr != null) { + removeFromRegionsInQueue(cr.getHRegion()); + return cr.getHRegion(); + } else { + return null; + } + } + + @Override + public int remainingCapacity() { + return queue.remainingCapacity(); + } + + @Override + public boolean contains(Object r) { + if (r instanceof HRegion) { + synchronized (regionsInQueue) { + return regionsInQueue.containsKey((HRegion) r); + } + } else if (r instanceof CompactionRequest) { + return queue.contains(r); + } + + return false; + } + + @Override + public HRegion element() { + CompactionRequest cr = queue.element(); + if (cr != null) { + return cr.getHRegion(); + } else { + return null; + } + } + + @Override + public HRegion peek() { + CompactionRequest cr = queue.peek(); + if (cr != null) { + return cr.getHRegion(); + } else { + return null; + } + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } + + @Override + public void clear() { + regionsInQueue.clear(); + queue.clear(); + } + + // Unimplemented methods, collection methods + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public T[] toArray(T[] a) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public int drainTo(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public int drainTo(Collection c, int maxElements) { + throw new UnsupportedOperationException("Not supported."); + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java (revision 0) @@ -0,0 +1,393 @@ +/** +* Copyright 2010 The Apache Software Foundation +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority; + +/** + * This class delegates to the BlockingQueue but wraps all HRegions in + * compaction requests that hold the priority and the date requested. + * + * Implementation Note: With an elevation time of -1 there is the potential for + * starvation of the lower priority compaction requests as long as there is a + * constant stream of high priority requests. + */ +public class PriorityCompactionQueue implements BlockingQueue { + /** + * This class represents a compaction request and holds the region, priority, + * and time submitted. + */ + private class CompactionRequest implements Comparable { + private final HRegion r; + private final Priority p; + private final Date date; + + public CompactionRequest(HRegion r, Priority p) { + this(r, p, null); + } + + public CompactionRequest(HRegion r, Priority p, Date d) { + if (r == null) { + throw new NullPointerException("HRegion cannot be null"); + } + + if (p == null) { + p = Priority.NORMAL; //the default priority + } + + if (d == null) { + d = new Date(); + } + + this.r = r; + this.p = p; + this.date = d; + } + + /** + * This function will define where in the priority queue the request will + * end up. Those with the highest priorities will be first. When the + * priorities are the same it will It will first compare priority then date + * to maintain a FIFO functionality. + * + *

Note: The date is only accurate to the millisecond which means it is + * possible that two requests were inserted into the queue within a + * millisecond. When that is the case this function will break the tie + * arbitrarily. + */ + @Override + public int compareTo(CompactionRequest request) { + //NOTE: The head of the priority queue is the least element + if (this.equals(request)) { + return 0; //they are the same request + } + int compareVal; + + compareVal = p.compareTo(request.p); //compare priority + if (compareVal != 0) { + return compareVal; + } + + compareVal = date.compareTo(request.date); + if (compareVal != 0) { + return compareVal; + } + + //break the tie arbitrarily + return -1; + } + + /** Gets the HRegion for the request */ + HRegion getHRegion() { + return r; + } + + /** Gets the priority for the request */ + Priority getPriority() { + return p; + } + + public String toString() { + return "regionName=" + r.getRegionNameAsString() + + ", priority=" + p + ", date=" + date; + } + } + + static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class); + + /** The actual blocking queue we delegate to */ + protected BlockingQueue queue = + new PriorityBlockingQueue(); + /** Hash map of the HRegions contained within the Compaction Queue */ + private final HashMap regionsInQueue = + new HashMap(); + + /** Creates a new PriorityCompactionQueue with no priority elevation time */ + public PriorityCompactionQueue() { + LOG.debug("Create PriorityCompactionQueue"); + } + + /** If the region is not already in the queue it will add it and return a + * new compaction request object. If it is already present in the queue + * then it will return null. + * @param p If null it will use the default priority + * @return returns a compaction request if it isn't already in the queue + */ + protected CompactionRequest addToRegionsInQueue(HRegion r, Priority p) { + CompactionRequest queuedRequest = null; + CompactionRequest newRequest = new CompactionRequest(r, p); + synchronized (regionsInQueue) { + queuedRequest = regionsInQueue.get(r); + if (queuedRequest == null || newRequest.getPriority().compareTo(queuedRequest.getPriority()) < 0) { + LOG.trace("Inserting region in queue. " + newRequest); + regionsInQueue.put(r, newRequest); + } else { + LOG.trace("Region already in queue. queued: " + queuedRequest + + ", requested: "+newRequest); + newRequest = null; //it is already present so don't add it + } + } + + if (newRequest != null && queuedRequest != null) { + //remove the lower priority request + queue.remove(queuedRequest); + } + + return newRequest; + } + + /** Removes the request from the regions in queue + * @param p If null it will use the default priority + */ + protected CompactionRequest removeFromRegionsInQueue(HRegion r) { + if (r == null) + return null; + + synchronized (regionsInQueue) { + CompactionRequest cr = regionsInQueue.remove(r); + if (cr == null) { + LOG.warn("Removed a region it couldn't find in regionsInQueue: " + r); + } + return cr; + } + } + + public boolean add(HRegion e, Priority p) { + CompactionRequest request = this.addToRegionsInQueue(e, p); + if (request != null) { + boolean result = queue.add(request); + CompactionRequest cr = queue.peek(); + return result; + } else { + return false; + } + } + + @Override + public boolean add(HRegion e) { + return add(e, null); + } + + public boolean offer(HRegion e, Priority p) { + CompactionRequest request = this.addToRegionsInQueue(e, p); + if (request != null) { + return queue.offer(request); + } else { + return false; + } + } + + @Override + public boolean offer(HRegion e) { + return offer(e, null); + } + + public void put(HRegion e, Priority p) throws InterruptedException { + CompactionRequest request = this.addToRegionsInQueue(e, p); + if (request != null) { + queue.put(request); + } + } + + @Override + public void put(HRegion e) throws InterruptedException { + put(e, null); + } + + public boolean offer(HRegion e, Priority p, long timeout, TimeUnit unit) throws InterruptedException { + CompactionRequest request = this.addToRegionsInQueue(e, p); + if (request != null) { + return queue.offer(request, timeout, unit); + } else { + return false; + } + } + + @Override + public boolean offer(HRegion e, long timeout, TimeUnit unit) throws InterruptedException { + return offer(e, null, timeout, unit); + } + + @Override + public HRegion take() throws InterruptedException { + CompactionRequest cr = queue.take(); + if (cr != null) { + removeFromRegionsInQueue(cr.getHRegion()); + return cr.getHRegion(); + } else { + return null; + } + } + + @Override + public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException { + CompactionRequest cr = queue.poll(timeout, unit); + if (cr != null) { + removeFromRegionsInQueue(cr.getHRegion()); + return cr.getHRegion(); + } else { + return null; + } + } + + @Override + public boolean remove(Object r) { + if (r instanceof HRegion) { + CompactionRequest cr = removeFromRegionsInQueue((HRegion) r); + if (cr != null) { + return queue.remove(cr); + } + } + + return false; + } + + @Override + public HRegion remove() { + CompactionRequest cr = queue.remove(); + if (cr != null) { + removeFromRegionsInQueue(cr.getHRegion()); + return cr.getHRegion(); + } else { + return null; + } + } + + @Override + public HRegion poll() { + CompactionRequest cr = queue.poll(); + if (cr != null) { + removeFromRegionsInQueue(cr.getHRegion()); + return cr.getHRegion(); + } else { + return null; + } + } + + @Override + public int remainingCapacity() { + return queue.remainingCapacity(); + } + + @Override + public boolean contains(Object r) { + if (r instanceof HRegion) { + synchronized (regionsInQueue) { + return regionsInQueue.containsKey((HRegion) r); + } + } else if (r instanceof CompactionRequest) { + return queue.contains(r); + } + + return false; + } + + @Override + public HRegion element() { + CompactionRequest cr = queue.element(); + if (cr != null) { + return cr.getHRegion(); + } else { + return null; + } + } + + @Override + public HRegion peek() { + CompactionRequest cr = queue.peek(); + if (cr != null) { + return cr.getHRegion(); + } else { + return null; + } + } + + @Override + public int size() { + return queue.size(); + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } + + @Override + public void clear() { + regionsInQueue.clear(); + queue.clear(); + } + + // Unimplemented methods, collection methods + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public T[] toArray(T[] a) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public int drainTo(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public int drainTo(Collection c, int maxElements) { + throw new UnsupportedOperationException("Not supported."); + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1001955) +++ src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -212,7 +212,8 @@ LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + "store files; delaying flush up to " + this.blockingWaitTime + "ms"); } - this.server.compactSplitThread.requestCompaction(region, getName()); + this.server.compactSplitThread.requestCompaction(region, getName(), + CompactSplitThread.Priority.HIGH_BLOCKING); // Put back on the queue. Have it come back out of the queue // after a delay of this.blockingWaitTime / 100 ms. this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); Index: src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 1001955) +++ src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (working copy) @@ -20,9 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.HashSet; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -43,11 +40,37 @@ private final HRegionServer server; private final Configuration conf; - private final BlockingQueue compactionQueue = - new LinkedBlockingQueue(); + private final PriorityCompactionQueue compactionQueue = + new PriorityCompactionQueue(); - private final HashSet regionsInQueue = new HashSet(); + /** The priorities for a compaction request. */ + public enum Priority implements Comparable { + //NOTE: All priorities should be numbered consecutively starting with 1. + //The highest priority should be 1 followed by all lower priorities. + //Priorities can be changed at anytime without requiring any changes to the + //queue. + /** HIGH_BLOCKING should only be used when an operation is blocked until a + * compact / split is done (e.g. a MemStore can't flush because it has + * "too many store files" and is blocking until a compact / split is done) + */ + HIGH_BLOCKING(1), + /** A normal compaction / split request */ + NORMAL(2), + /** A low compaction / split request -- not currently used */ + LOW(3); + + int value; + + Priority(int value) { + this.value = value; + } + + int getInt() { + return value; + } + } + /** * Splitting should not take place if the total number of regions exceed this. * This is not a hard limit to the number of regions but it is a guideline to @@ -74,9 +97,6 @@ try { r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS); if (r != null && !this.server.isStopped()) { - synchronized (regionsInQueue) { - regionsInQueue.remove(r); - } lock.lock(); try { // Don't interrupt us while we are working @@ -107,23 +127,32 @@ } } } - regionsInQueue.clear(); compactionQueue.clear(); LOG.info(getName() + " exiting"); } public synchronized void requestCompaction(final HRegion r, final String why) { - requestCompaction(r, false, why); + requestCompaction(r, false, why, Priority.NORMAL); } + public synchronized void requestCompaction(final HRegion r, + final String why, Priority p) { + requestCompaction(r, false, why, p); + } + + public synchronized void requestCompaction(final HRegion r, + final boolean force, final String why) { + requestCompaction(r, force, why, Priority.NORMAL); + } + /** * @param r HRegion store belongs to * @param force Whether next compaction should be major * @param why Why compaction requested -- used in debug messages */ public synchronized void requestCompaction(final HRegion r, - final boolean force, final String why) { + final boolean force, final String why, Priority priority) { if (this.server.isStopped()) { return; } @@ -131,14 +160,10 @@ if (LOG.isDebugEnabled()) { LOG.debug("Compaction " + (force? "(major) ": "") + "requested for region " + r.getRegionNameAsString() + - (why != null && !why.isEmpty()? " because: " + why: "")); + (why != null && !why.isEmpty()? " because: " + why: "") + + "; Priority: " + priority + "; Compaction queue size: " + compactionQueue.size()); } - synchronized (regionsInQueue) { - if (!regionsInQueue.contains(r)) { - compactionQueue.add(r); - regionsInQueue.add(r); - } - } + compactionQueue.add(r, priority); } private void split(final HRegion parent, final byte [] midKey)