diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java new file mode 100644 index 0000000..7adfc5d --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java @@ -0,0 +1,559 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Helper class that allows to create and manipulate an AvlTree. + * The main utility is in cases where over time we have a lot of add/remove of the same object, + * and we want to avoid all the allocations/deallocations of the "node" objects that the + * java containers will create. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class AvlUtil { + private AvlUtil() {} + + /** + * This class represent a node that will be used in an AvlTree. + * Instead of creating another object for the tree node, + * like the TreeMap and the other java contains, here the node can be extended + * and the content can be embedded directly in the node itself. + * This is useful in cases where over time we have a lot of add/remove of the same object. + */ + @InterfaceAudience.Private + public static abstract class AvlNode { + protected TNode avlLeft; + protected TNode avlRight; + protected int avlHeight; + + public abstract int compareTo(TNode other); + } + + /** + * This class extends the AvlNode and adds two links that will be used in conjunction + * with the AvlIterableList class. + * This is useful in situations where your node must be in a map to have a quick lookup by key, + * but it also require to be in something like a list/queue. + * This is useful in cases where over time we have a lot of add/remove of the same object. + */ + @InterfaceAudience.Private + public static abstract class AvlLinkedNode extends AvlNode { + protected TNode iterNext = null; + protected TNode iterPrev = null; + } + + @InterfaceAudience.Private + public interface AvlInsertOrReplace { + TNode insert(Object searchKey); + TNode replace(Object searchKey, TNode prevNode); + } + + /** + * The AvlTree allows to lookup an object using a custom key. + * e.g. the java Map allows only to lookup by key using the Comparator + * specified in the constructor. + * In this case you can pass a specific comparator for every needs. + */ + @InterfaceAudience.Private + public static interface AvlKeyComparator { + int compareKey(TNode node, Object key); + } + + /** + * Visitor that allows to traverse a set of AvlNodes. + * If you don't like the callback style of the visitor you can always use the AvlTreeIterator. + */ + @InterfaceAudience.Private + public static interface AvlNodeVisitor { + /** + * @param node the node that we are currently visiting + * @return false to stop the iteration. true to continue. + */ + boolean visitNode(TNode node); + } + + /** + * Helper class that allows to create and manipulate an AVL Tree + */ + @InterfaceAudience.Private + public static class AvlTree { + /** + * @param root the current root of the tree + * @param key the key for the node we are trying to find + * @param keyComparator the comparator to use to match node and key + * @return the node that matches the specified key or null in case of node not found. + */ + public static TNode get(TNode root, final Object key, + final AvlKeyComparator keyComparator) { + while (root != null) { + int cmp = keyComparator.compareKey(root, key); + if (cmp > 0) { + root = (TNode)root.avlLeft; + } else if (cmp < 0) { + root = (TNode)root.avlRight; + } else { + return (TNode)root; + } + } + return null; + } + + /** + * @param root the current root of the tree + * @return the first (min) node of the tree + */ + public static TNode getFirst(TNode root) { + if (root != null) { + while (root.avlLeft != null) { + root = (TNode)root.avlLeft; + } + } + return root; + } + + /** + * @param root the current root of the tree + * @return the last (max) node of the tree + */ + public static TNode getLast(TNode root) { + if (root != null) { + while (root.avlRight != null) { + root = (TNode)root.avlRight; + } + } + return root; + } + + /** + * Insert a node into the tree. It uses the AvlNode.compareTo() for ordering. + * NOTE: The node must not be already in the tree. + * @param root the current root of the tree + * @param node the node to insert + * @return the new root of the tree + */ + public static TNode insert(TNode root, TNode node) { + if (root == null) return node; + + int cmp = node.compareTo(root); + assert cmp != 0 : "node already inserted: " + root; + if (cmp < 0) { + root.avlLeft = insert(root.avlLeft, node); + } else { + root.avlRight = insert(root.avlRight, node); + } + return balance(root); + } + + /** + * Insert a node into the tree. + * This is useful when you want to create a new node or replace the content + * depending if the node already exists or not. + * Using AvlInsertOrReplace class you can return the node to add/replace. + * + * @param root the current root of the tree + * @param key the key for the node we are trying to insert + * @param keyComparator the comparator to use to match node and key + * @param insertOrReplace the class to use to insert or replace the node + * @return the new root of the tree + */ + public static TNode insert(TNode root, Object key, + final AvlKeyComparator keyComparator, + final AvlInsertOrReplace insertOrReplace) { + if (root == null) { + return insertOrReplace.insert(key); + } + + int cmp = keyComparator.compareKey(root, key); + if (cmp < 0) { + root.avlLeft = insert((TNode)root.avlLeft, key, keyComparator, insertOrReplace); + } else if (cmp > 0) { + root.avlRight = insert((TNode)root.avlRight, key, keyComparator, insertOrReplace); + } else { + TNode left = (TNode)root.avlLeft; + TNode right = (TNode)root.avlRight; + root = insertOrReplace.replace(key, root); + root.avlLeft = left; + root.avlRight = right; + return root; + } + return balance(root); + } + + private static TNode removeMin(TNode p) { + if (p.avlLeft == null) + return (TNode)p.avlRight; + p.avlLeft = removeMin(p.avlLeft); + return balance(p); + } + + /** + * Removes the node matching the specified key from the tree + * @param root the current root of the tree + * @param key the key for the node we are trying to find + * @param keyComparator the comparator to use to match node and key + * @return the new root of the tree + */ + public static TNode remove(TNode root, Object key, + final AvlKeyComparator keyComparator) { + return remove(root, key, keyComparator, null); + } + + /** + * Removes the node matching the specified key from the tree + * @param root the current root of the tree + * @param key the key for the node we are trying to find + * @param keyComparator the comparator to use to match node and key + * @param removed will be set to true if the node was found and removed, otherwise false + * @return the new root of the tree + */ + public static TNode remove(TNode root, Object key, + final AvlKeyComparator keyComparator, final AtomicBoolean removed) { + if (root == null) return null; + + int cmp = keyComparator.compareKey(root, key); + if (cmp == 0) { + if (removed != null) removed.set(true); + + TNode q = (TNode)root.avlLeft; + TNode r = (TNode)root.avlRight; + if (r == null) return q; + TNode min = getFirst(r); + min.avlRight = removeMin(r); + min.avlLeft = q; + return balance(min); + } else if (cmp > 0) { + root.avlLeft = remove((TNode)root.avlLeft, key, keyComparator); + } else /* if (cmp < 0) */ { + root.avlRight = remove((TNode)root.avlRight, key, keyComparator); + } + return balance(root); + } + + /** + * Visit each node of the tree + * @param root the current root of the tree + * @param visitor the AvlNodeVisitor instance + */ + public static void visit(final TNode root, + final AvlNodeVisitor visitor) { + if (root == null) return; + + final AvlTreeIterator iterator = new AvlTreeIterator(root); + boolean visitNext = true; + while (visitNext && iterator.hasNext()) { + visitNext = visitor.visitNode(iterator.next()); + } + } + + private static TNode balance(TNode p) { + fixHeight(p); + int balance = balanceFactor(p); + if (balance == 2) { + if (balanceFactor(p.avlRight) < 0) { + p.avlRight = rotateRight(p.avlRight); + } + return rotateLeft(p); + } else if (balance == -2) { + if (balanceFactor(p.avlLeft) > 0) { + p.avlLeft = rotateLeft(p.avlLeft); + } + return rotateRight(p); + } + return p; + } + + private static TNode rotateRight(TNode p) { + TNode q = (TNode)p.avlLeft; + p.avlLeft = q.avlRight; + q.avlRight = p; + fixHeight(p); + fixHeight(q); + return q; + } + + private static TNode rotateLeft(TNode q) { + TNode p = (TNode)q.avlRight; + q.avlRight = p.avlLeft; + p.avlLeft = q; + fixHeight(q); + fixHeight(p); + return p; + } + + private static void fixHeight(TNode node) { + final int heightLeft = height(node.avlLeft); + final int heightRight = height(node.avlRight); + node.avlHeight = 1 + Math.max(heightLeft, heightRight); + } + + private static int height(TNode node) { + return node != null ? node.avlHeight : 0; + } + + private static int balanceFactor(TNode node) { + return height(node.avlRight) - height(node.avlLeft); + } + } + + /** + * Iterator for the AvlTree + */ + @InterfaceAudience.Private + public static class AvlTreeIterator implements Iterator { + private final Object[] stack = new Object[32]; + + private TNode current = null; + private int height = 0; + + public AvlTreeIterator() { + } + + /** + * Create the iterator starting from the first (min) node of the tree + */ + public AvlTreeIterator(final TNode root) { + seekFirst(root); + } + + /** + * Create the iterator starting from the specified key + * @param root the current root of the tree + * @param key the key for the node we are trying to find + * @param keyComparator the comparator to use to match node and key + */ + public AvlTreeIterator(final TNode root, final Object key, + final AvlKeyComparator keyComparator) { + seekTo(root, key, keyComparator); + } + + @Override + public boolean hasNext() { + return current != null; + } + + @Override + public TNode next() { + final TNode node = this.current; + seekNext(); + return node; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * Reset the iterator, and seeks to the first (min) node of the tree + * @param root the current root of the tree + */ + public void seekFirst(final TNode root) { + current = root; + height = 0; + if (root != null) { + while (current.avlLeft != null) { + stack[height++] = current; + current = (TNode)current.avlLeft; + } + } + } + + /** + * Reset the iterator, and seeks to the specified key + * @param root the current root of the tree + * @param key the key for the node we are trying to find + * @param keyComparator the comparator to use to match node and key + */ + public void seekTo(final TNode root, final Object key, + final AvlKeyComparator keyComparator) { + current = null; + height = 0; + + TNode node = root; + while (node != null) { + if (keyComparator.compareKey(node, key) >= 0) { + if (node.avlLeft != null) { + stack[height++] = node; + node = (TNode)node.avlLeft; + } else { + current = node; + return; + } + } else { + if (node.avlRight != null) { + stack[height++] = node; + node = (TNode)node.avlRight; + } else { + if (height > 0) { + TNode parent = (TNode)stack[--height]; + while (node == parent.avlRight) { + if (height == 0) { + current = null; + return; + } + node = parent; + parent = (TNode)stack[--height]; + } + current = parent; + return; + } + current = null; + return; + } + } + } + } + + private void seekNext() { + if (current == null) return; + if (current.avlRight != null) { + stack[height++] = current; + current = (TNode)current.avlRight; + while (current.avlLeft != null) { + stack[height++] = current; + current = (TNode)current.avlLeft; + } + } else { + TNode node; + do { + if (height == 0) { + current = null; + return; + } + node = current; + current = (TNode)stack[--height]; + } while (current.avlRight == node); + } + } + } + + /** + * Helper class that allows to create and manipulate a linked list of AvlLinkedNodes + */ + @InterfaceAudience.Private + public static class AvlIterableList { + /** + * @param node the current node + * @return the successor of the current node + */ + public static TNode readNext(TNode node) { + return (TNode)node.iterNext; + } + + /** + * @param node the current node + * @return the predecessor of the current node + */ + public static TNode readPrev(TNode node) { + return (TNode)node.iterPrev; + } + + /** + * @param head the head of the linked list + * @param node the node to add to the front of the list + * @return the new head of the list + */ + public static TNode prepend(TNode head, TNode node) { + assert !isLinked(node) : node + " is already linked"; + if (head != null) { + TNode tail = (TNode)head.iterPrev; + tail.iterNext = node; + head.iterPrev = node; + node.iterNext = head; + node.iterPrev = tail; + } else { + node.iterNext = node; + node.iterPrev = node; + } + return node; + } + + /** + * @param head the head of the linked list + * @param node the node to add to the tail of the list + * @return the new head of the list + */ + public static TNode append(TNode head, TNode node) { + assert !isLinked(node) : node + " is already linked"; + if (head != null) { + TNode tail = (TNode)head.iterPrev; + tail.iterNext = node; + node.iterNext = head; + node.iterPrev = tail; + head.iterPrev = node; + return head; + } + node.iterNext = node; + node.iterPrev = node; + return node; + } + + /** + * @param head the head of the current linked list + * @param otherHead the head of the list to append to the current list + * @return the new head of the current list + */ + public static TNode appendList(TNode head, TNode otherHead) { + if (head == null) return otherHead; + if (otherHead == null) return head; + + TNode tail = (TNode)head.iterPrev; + TNode otherTail = (TNode)otherHead.iterPrev; + tail.iterNext = otherHead; + otherHead.iterPrev = tail; + otherTail.iterNext = head; + head.iterPrev = otherTail; + return head; + } + + /** + * @param head the head of the linked list + * @param node the node to remove from the list + * @return the new head of the list + */ + public static TNode remove(TNode head, TNode node) { + assert isLinked(node) : node + " is not linked"; + if (node != node.iterNext) { + node.iterPrev.iterNext = node.iterNext; + node.iterNext.iterPrev = node.iterPrev; + head = (head == node) ? (TNode)node.iterNext : head; + } else { + head = null; + } + node.iterNext = null; + node.iterPrev = null; + return head; + } + + /** + * @param node the node to check + * @return true if the node is linked to a list, false otherwise + */ + public static boolean isLinked(TNode node) { + return node.iterPrev != null && node.iterNext != null; + } + } +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestAvlUtil.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestAvlUtil.java new file mode 100644 index 0000000..3c7b680 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestAvlUtil.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.util.Random; +import java.util.TreeMap; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; +import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; +import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode; +import org.apache.hadoop.hbase.util.AvlUtil.AvlNode; +import org.apache.hadoop.hbase.util.AvlUtil.AvlNodeVisitor; +import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; +import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@Category({MiscTests.class, SmallTests.class}) +public class TestAvlUtil { + private static final TestAvlKeyComparator KEY_COMPARATOR = new TestAvlKeyComparator(); + + @Test + public void testAvlTreeCrud() { + final int MAX_KEY = 99999999; + final int NELEM = 10000; + + final TreeMap treeMap = new TreeMap(); + TestAvlNode root = null; + + final Random rand = new Random(); + for (int i = 0; i < NELEM; ++i) { + int key = rand.nextInt(MAX_KEY); + if (AvlTree.get(root, key, KEY_COMPARATOR) != null) { + i--; + continue; + } + root = AvlTree.insert(root, new TestAvlNode(key)); + treeMap.put(key, null); + for (Integer keyX: treeMap.keySet()) { + TestAvlNode node = AvlTree.get(root, keyX, KEY_COMPARATOR); + assertNotNull(node); + assertEquals(keyX.intValue(), node.getKey()); + } + } + + for (int i = 0; i < NELEM; ++i) { + int key = rand.nextInt(MAX_KEY); + TestAvlNode node = AvlTree.get(root, key, KEY_COMPARATOR); + if (!treeMap.containsKey(key)) { + assert node == null; + continue; + } + treeMap.remove(key); + assertEquals(key, node.getKey()); + root = AvlTree.remove(root, key, KEY_COMPARATOR); + for (Integer keyX: treeMap.keySet()) { + node = AvlTree.get(root, keyX, KEY_COMPARATOR); + assertNotNull(node); + assertEquals(keyX.intValue(), node.getKey()); + } + } + } + + @Test + public void testAvlTreeVisitor() { + final int MIN_KEY = 0; + final int MAX_KEY = 50; + + TestAvlNode root = null; + for (int i = MAX_KEY; i >= MIN_KEY; --i) { + root = AvlTree.insert(root, new TestAvlNode(i)); + } + + AvlTree.visit(root, new AvlNodeVisitor() { + private int prevKey = -1; + public boolean visitNode(TestAvlNode node) { + assertEquals(prevKey, node.getKey() - 1); + assertTrue(node.getKey() >= MIN_KEY); + assertTrue(node.getKey() <= MAX_KEY); + prevKey = node.getKey(); + return node.getKey() <= MAX_KEY; + } + }); + } + + @Test + public void testAvlTreeIterSeekFirst() { + final int MIN_KEY = 1; + final int MAX_KEY = 50; + + TestAvlNode root = null; + for (int i = MIN_KEY; i < MAX_KEY; ++i) { + root = AvlTree.insert(root, new TestAvlNode(i)); + } + + AvlTreeIterator iter = new AvlTreeIterator(root); + assertTrue(iter.hasNext()); + long prevKey = 0; + while (iter.hasNext()) { + TestAvlNode node = iter.next(); + assertEquals(prevKey + 1, node.getKey()); + prevKey = node.getKey(); + } + assertEquals(MAX_KEY - 1, prevKey); + } + + @Test + public void testAvlTreeIterSeekTo() { + final int MIN_KEY = 1; + final int MAX_KEY = 50; + + TestAvlNode root = null; + for (int i = MIN_KEY; i < MAX_KEY; i += 2) { + root = AvlTree.insert(root, new TestAvlNode(i)); + } + + for (int i = MIN_KEY - 1; i < MAX_KEY + 1; ++i) { + AvlTreeIterator iter = new AvlTreeIterator(root, i, KEY_COMPARATOR); + if (i < MAX_KEY) { + assertTrue(iter.hasNext()); + } else { + // searching for something greater than the last node + assertFalse(iter.hasNext()); + break; + } + + TestAvlNode node = iter.next(); + assertEquals((i % 2 == 0) ? i + 1 : i, node.getKey()); + + long prevKey = node.getKey(); + while (iter.hasNext()) { + node = iter.next(); + assertTrue(node.getKey() > prevKey); + prevKey = node.getKey(); + } + } + } + + @Test + public void testAvlIterableListCrud() { + final int NITEMS = 10; + TestLinkedAvlNode prependHead = null; + TestLinkedAvlNode appendHead = null; + // prepend()/append() + for (int i = 0; i <= NITEMS; ++i) { + TestLinkedAvlNode pNode = new TestLinkedAvlNode(i); + assertFalse(AvlIterableList.isLinked(pNode)); + prependHead = AvlIterableList.prepend(prependHead, pNode); + assertTrue(AvlIterableList.isLinked(pNode)); + + TestLinkedAvlNode aNode = new TestLinkedAvlNode(i); + assertFalse(AvlIterableList.isLinked(aNode)); + appendHead = AvlIterableList.append(appendHead, aNode); + assertTrue(AvlIterableList.isLinked(aNode)); + } + // readNext() + TestLinkedAvlNode pNode = prependHead; + TestLinkedAvlNode aNode = appendHead; + for (int i = 0; i <= NITEMS; ++i) { + assertEquals(NITEMS - i, pNode.getKey()); + pNode = AvlIterableList.readNext(pNode); + + assertEquals(i, aNode.getKey()); + aNode = AvlIterableList.readNext(aNode); + } + // readPrev() + pNode = AvlIterableList.readPrev(prependHead); + aNode = AvlIterableList.readPrev(appendHead); + for (int i = 0; i <= NITEMS; ++i) { + assertEquals(i, pNode.getKey()); + pNode = AvlIterableList.readPrev(pNode); + + assertEquals(NITEMS - i, aNode.getKey()); + aNode = AvlIterableList.readPrev(aNode); + } + // appendList() + TestLinkedAvlNode node = AvlIterableList.appendList(prependHead, appendHead); + for (int i = NITEMS; i >= 0; --i) { + assertEquals(i, node.getKey()); + node = AvlIterableList.readNext(node); + } + for (int i = 0; i <= NITEMS; ++i) { + assertEquals(i, node.getKey()); + node = AvlIterableList.readNext(node); + } + } + + private static class TestAvlNode extends AvlNode { + private final int key; + + public TestAvlNode(int key) { + this.key = key; + } + + public int getKey() { + return key; + } + + @Override + public int compareTo(TestAvlNode other) { + return this.key - other.key; + } + + @Override + public String toString() { + return String.format("TestAvlNode(%d)", key); + } + } + + private static class TestLinkedAvlNode extends AvlLinkedNode { + private final int key; + + public TestLinkedAvlNode(int key) { + this.key = key; + } + + public int getKey() { + return key; + } + + @Override + public int compareTo(TestLinkedAvlNode other) { + return this.key - other.key; + } + + @Override + public String toString() { + return String.format("TestLinkedAvlNode(%d)", key); + } + } + + private static class TestAvlKeyComparator implements AvlKeyComparator { + public int compareKey(TestAvlNode node, Object key) { + return node.getKey() - (int)key; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index d4791fe..b9f9fb0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -40,6 +40,10 @@ import org.apache.hadoop.hbase.master.TableLockManager.TableLock; import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet; +import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; +import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; +import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode; +import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; /** * ProcedureRunnableSet for the Master Procedures. @@ -65,13 +69,20 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { private final ReentrantLock schedLock = new ReentrantLock(); private final Condition schedWaitCond = schedLock.newCondition(); + private final static NamespaceQueueKeyComparator NAMESPACE_QUEUE_KEY_COMPARATOR = + new NamespaceQueueKeyComparator(); + private final static ServerQueueKeyComparator SERVER_QUEUE_KEY_COMPARATOR = + new ServerQueueKeyComparator(); + private final static TableQueueKeyComparator TABLE_QUEUE_KEY_COMPARATOR = + new TableQueueKeyComparator(); + private final FairQueue serverRunQueue = new FairQueue(); private final FairQueue tableRunQueue = new FairQueue(); private int queueSize = 0; - private final Object[] serverBuckets = new Object[128]; - private Queue namespaceMap = null; - private Queue tableMap = null; + private final ServerQueue[] serverBuckets = new ServerQueue[128]; + private NamespaceQueue namespaceMap = null; + private TableQueue tableMap = null; private final int metaTablePriority; private final int userTablePriority; @@ -142,7 +153,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { // the queue is not suspended or removed from the fairq (run-queue) // because someone has an xlock on it. // so, if the queue is not-linked we should add it - if (queue.size() == 1 && !IterableList.isLinked(queue)) { + if (queue.size() == 1 && !AvlIterableList.isLinked(queue)) { fairq.add(queue); } queueSize++; @@ -152,7 +163,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { // our (proc) parent has the xlock, // so the queue is not in the fairq (run-queue) // add it back to let the child run (inherit the lock) - if (!IterableList.isLinked(queue)) { + if (!AvlIterableList.isLinked(queue)) { fairq.add(queue); } queueSize++; @@ -230,12 +241,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { try { // Remove Servers for (int i = 0; i < serverBuckets.length; ++i) { - clear((ServerQueue)serverBuckets[i], serverRunQueue); + clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); serverBuckets[i] = null; } // Remove Tables - clear(tableMap, tableRunQueue); + clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); tableMap = null; assert queueSize == 0 : "expected queue size to be 0, got " + queueSize; @@ -244,11 +255,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } - private > void clear(Queue treeMap, FairQueue fairq) { + private , TNode extends Queue> void clear(TNode treeMap, + final FairQueue fairq, final AvlKeyComparator comparator) { while (treeMap != null) { Queue node = AvlTree.getFirst(treeMap); assert !node.isSuspended() : "can't clear suspended " + node.getKey(); - treeMap = AvlTree.remove(treeMap, node.getKey()); + treeMap = AvlTree.remove(treeMap, node.getKey(), comparator); removeFromRunQueue(fairq, node); } } @@ -302,7 +314,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } private > void addToRunQueue(FairQueue fairq, Queue queue) { - if (IterableList.isLinked(queue)) return; + if (AvlIterableList.isLinked(queue)) return; if (!queue.isEmpty()) { fairq.add(queue); queueSize += queue.size(); @@ -310,7 +322,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } private > void removeFromRunQueue(FairQueue fairq, Queue queue) { - if (!IterableList.isLinked(queue)) return; + if (!AvlIterableList.isLinked(queue)) return; fairq.remove(queue); queueSize -= queue.size(); } @@ -507,11 +519,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } private void suspendTableQueue(Queue queue) { - waitingTables = IterableList.append(waitingTables, queue); + waitingTables = AvlIterableList.append(waitingTables, queue); } private void suspendServerQueue(Queue queue) { - waitingServers = IterableList.append(waitingServers, queue); + waitingServers = AvlIterableList.append(waitingServers, queue); } private boolean hasWaitingTables() { @@ -520,7 +532,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { private Queue popWaitingTable() { Queue node = waitingTables; - waitingTables = IterableList.remove(waitingTables, node); + waitingTables = AvlIterableList.remove(waitingTables, node); node.setSuspended(false); return node; } @@ -531,7 +543,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { private Queue popWaitingServer() { Queue node = waitingServers; - waitingServers = IterableList.remove(waitingServers, node); + waitingServers = AvlIterableList.remove(waitingServers, node); node.setSuspended(false); return node; } @@ -555,17 +567,17 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } private TableQueue getTableQueue(TableName tableName) { - Queue node = AvlTree.get(tableMap, tableName); - if (node != null) return (TableQueue)node; + TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); + if (node != null) return node; NamespaceQueue nsQueue = getNamespaceQueue(tableName.getNamespaceAsString()); node = new TableQueue(tableName, nsQueue, getTablePriority(tableName)); tableMap = AvlTree.insert(tableMap, node); - return (TableQueue)node; + return node; } private void removeTableQueue(TableName tableName) { - tableMap = AvlTree.remove(tableMap, tableName); + tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); } private int getTablePriority(TableName tableName) { @@ -589,12 +601,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { // Namespace Queue Lookup Helpers // ============================================================================ private NamespaceQueue getNamespaceQueue(String namespace) { - Queue node = AvlTree.get(namespaceMap, namespace); + NamespaceQueue node = AvlTree.get(namespaceMap, namespace, NAMESPACE_QUEUE_KEY_COMPARATOR); if (node != null) return (NamespaceQueue)node; node = new NamespaceQueue(namespace); namespaceMap = AvlTree.insert(namespaceMap, node); - return (NamespaceQueue)node; + return node; } // ============================================================================ @@ -610,24 +622,19 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } private ServerQueue getServerQueue(ServerName serverName) { - int index = getBucketIndex(serverBuckets, serverName.hashCode()); - Queue root = getTreeRoot(serverBuckets, index); - Queue node = AvlTree.get(root, serverName); - if (node != null) return (ServerQueue)node; + final int index = getBucketIndex(serverBuckets, serverName.hashCode()); + ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR); + if (node != null) return node; node = new ServerQueue(serverName); - serverBuckets[index] = AvlTree.insert(root, node); + serverBuckets[index] = AvlTree.insert(serverBuckets[index], node); return (ServerQueue)node; } private void removeServerQueue(ServerName serverName) { - int index = getBucketIndex(serverBuckets, serverName.hashCode()); - serverBuckets[index] = AvlTree.remove((ServerQueue)serverBuckets[index], serverName); - } - - @SuppressWarnings("unchecked") - private static > Queue getTreeRoot(Object[] buckets, int index) { - return (Queue) buckets[index]; + final int index = getBucketIndex(serverBuckets, serverName.hashCode()); + final ServerQueue root = serverBuckets[index]; + serverBuckets[index] = AvlTree.remove(root, serverName, SERVER_QUEUE_KEY_COMPARATOR); } private static int getBucketIndex(Object[] buckets, int hashCode) { @@ -645,6 +652,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { // ============================================================================ // Table and Server Queue Implementation // ============================================================================ + private static class ServerQueueKeyComparator implements AvlKeyComparator { + @Override + public int compareKey(ServerQueue node, Object key) { + return node.compareKey((ServerName)key); + } + } + public static class ServerQueue extends QueueImpl { public ServerQueue(ServerName serverName) { super(serverName); @@ -699,6 +713,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } + private static class TableQueueKeyComparator implements AvlKeyComparator { + @Override + public int compareKey(TableQueue node, Object key) { + return node.compareKey((TableName)key); + } + } + public static class TableQueue extends QueueImpl { private final NamespaceQueue namespaceQueue; @@ -845,6 +866,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } } + private static class NamespaceQueueKeyComparator implements AvlKeyComparator { + @Override + public int compareKey(NamespaceQueue node, Object key) { + return node.compareKey((String)key); + } + } + /** * the namespace is currently used just as a rwlock, not as a queue. * because ns operation are not frequent enough. so we want to avoid @@ -1016,7 +1044,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { if (queue.isEmpty() && queue.tryExclusiveLock(0)) { // remove the table from the run-queue and the map - if (IterableList.isLinked(queue)) { + if (AvlIterableList.isLinked(queue)) { tableRunQueue.remove(queue); } @@ -1260,13 +1288,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { boolean isSuspended(); } - private static abstract class Queue> implements QueueInterface { - private Queue avlRight = null; - private Queue avlLeft = null; - private int avlHeight = 1; - - private Queue iterNext = null; - private Queue iterPrev = null; + private static abstract class Queue> + extends AvlLinkedNode> implements QueueInterface { private boolean suspended = false; private long exclusiveLockProcIdOwner = Long.MIN_VALUE; @@ -1358,6 +1381,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return key.compareTo(cmpKey); } + @Override public int compareTo(Queue other) { return compareKey(other.key); } @@ -1433,13 +1457,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { } public void add(Queue queue) { - queueHead = IterableList.append(queueHead, queue); + queueHead = AvlIterableList.append(queueHead, queue); if (currentQueue == null) setNextQueue(queueHead); } public void remove(Queue queue) { - Queue nextQueue = queue.iterNext; - queueHead = IterableList.remove(queueHead, queue); + Queue nextQueue = AvlIterableList.readNext(queue); + queueHead = AvlIterableList.remove(queueHead, queue); if (currentQueue == queue) { setNextQueue(queueHead != null ? nextQueue : null); } @@ -1470,7 +1494,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { private boolean nextQueue() { if (currentQueue == null) return false; - currentQueue = currentQueue.iterNext; + currentQueue = AvlIterableList.readNext(currentQueue); return currentQueue != null; } @@ -1487,187 +1511,4 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet { return Math.max(1, queue.getPriority() * quantum); // TODO } } - - private static class AvlTree { - public static > Queue get(Queue root, T key) { - while (root != null) { - int cmp = root.compareKey(key); - if (cmp > 0) { - root = root.avlLeft; - } else if (cmp < 0) { - root = root.avlRight; - } else { - return root; - } - } - return null; - } - - public static > Queue getFirst(Queue root) { - if (root != null) { - while (root.avlLeft != null) { - root = root.avlLeft; - } - } - return root; - } - - public static > Queue getLast(Queue root) { - if (root != null) { - while (root.avlRight != null) { - root = root.avlRight; - } - } - return root; - } - - public static > Queue insert(Queue root, Queue node) { - if (root == null) return node; - if (node.compareTo(root) < 0) { - root.avlLeft = insert(root.avlLeft, node); - } else { - root.avlRight = insert(root.avlRight, node); - } - return balance(root); - } - - private static > Queue removeMin(Queue p) { - if (p.avlLeft == null) - return p.avlRight; - p.avlLeft = removeMin(p.avlLeft); - return balance(p); - } - - public static > Queue remove(Queue root, T key) { - if (root == null) return null; - - int cmp = root.compareKey(key); - if (cmp == 0) { - Queue q = root.avlLeft; - Queue r = root.avlRight; - if (r == null) return q; - Queue min = getFirst(r); - min.avlRight = removeMin(r); - min.avlLeft = q; - return balance(min); - } else if (cmp > 0) { - root.avlLeft = remove(root.avlLeft, key); - } else /* if (cmp < 0) */ { - root.avlRight = remove(root.avlRight, key); - } - return balance(root); - } - - private static > Queue balance(Queue p) { - fixHeight(p); - int balance = balanceFactor(p); - if (balance == 2) { - if (balanceFactor(p.avlRight) < 0) { - p.avlRight = rotateRight(p.avlRight); - } - return rotateLeft(p); - } else if (balance == -2) { - if (balanceFactor(p.avlLeft) > 0) { - p.avlLeft = rotateLeft(p.avlLeft); - } - return rotateRight(p); - } - return p; - } - - private static > Queue rotateRight(Queue p) { - Queue q = p.avlLeft; - p.avlLeft = q.avlRight; - q.avlRight = p; - fixHeight(p); - fixHeight(q); - return q; - } - - private static > Queue rotateLeft(Queue q) { - Queue p = q.avlRight; - q.avlRight = p.avlLeft; - p.avlLeft = q; - fixHeight(q); - fixHeight(p); - return p; - } - - private static > void fixHeight(Queue node) { - int heightLeft = height(node.avlLeft); - int heightRight = height(node.avlRight); - node.avlHeight = 1 + Math.max(heightLeft, heightRight); - } - - private static > int height(Queue node) { - return node != null ? node.avlHeight : 0; - } - - private static > int balanceFactor(Queue node) { - return height(node.avlRight) - height(node.avlLeft); - } - } - - private static class IterableList { - public static > Queue prepend(Queue head, Queue node) { - assert !isLinked(node) : node + " is already linked"; - if (head != null) { - Queue tail = head.iterPrev; - tail.iterNext = node; - head.iterPrev = node; - node.iterNext = head; - node.iterPrev = tail; - } else { - node.iterNext = node; - node.iterPrev = node; - } - return node; - } - - public static > Queue append(Queue head, Queue node) { - assert !isLinked(node) : node + " is already linked"; - if (head != null) { - Queue tail = head.iterPrev; - tail.iterNext = node; - node.iterNext = head; - node.iterPrev = tail; - head.iterPrev = node; - return head; - } - node.iterNext = node; - node.iterPrev = node; - return node; - } - - public static > Queue appendList(Queue head, Queue otherHead) { - if (head == null) return otherHead; - if (otherHead == null) return head; - - Queue tail = head.iterPrev; - Queue otherTail = otherHead.iterPrev; - tail.iterNext = otherHead; - otherHead.iterPrev = tail; - otherTail.iterNext = head; - head.iterPrev = otherTail; - return head; - } - - private static > Queue remove(Queue head, Queue node) { - assert isLinked(node) : node + " is not linked"; - if (node != node.iterNext) { - node.iterPrev.iterNext = node.iterNext; - node.iterNext.iterPrev = node.iterPrev; - head = (head == node) ? node.iterNext : head; - } else { - head = null; - } - node.iterNext = null; - node.iterPrev = null; - return head; - } - - private static > boolean isLinked(Queue node) { - return node.iterPrev != null && node.iterNext != null; - } - } } \ No newline at end of file