diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 12668e9..d993f44 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -29,15 +29,15 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Set; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -68,8 +68,8 @@ import org.apache.hadoop.hbase.master.handler.SplitRegionHandler; import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException; import org.apache.hadoop.hbase.regionserver.RegionOpeningState; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; -import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.PrimaryIdLock; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; import org.apache.hadoop.hbase.zookeeper.ZKAssign; @@ -109,7 +109,7 @@ public class AssignmentManager extends ZooKeeperListener { private LoadBalancer balancer; - final private KeyLocker locker = new KeyLocker(); + final private PrimaryIdLock idLock = new PrimaryIdLock(); /** * Map of regions to reopen after the schema of a table is changed. Key - @@ -441,7 +441,7 @@ public class AssignmentManager extends ZooKeeperListener { // It has no reason to be a lock shared with the other operations. // We can do the lock on the region only, instead of a global lock: what we want to ensure // is that we don't have two threads working on the same region. - Lock lock = locker.acquireLock(encodedRegionName); + PrimaryIdLock.Entry lockEntry = idLock.getLockEntry(encodedRegionName); try { Stat stat = new Stat(); byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat); @@ -461,7 +461,7 @@ public class AssignmentManager extends ZooKeeperListener { processRegionsInTransition(rt, hri, stat.getVersion()); return true; } finally { - lock.unlock(); + lockEntry.release(); } } @@ -651,7 +651,7 @@ public class AssignmentManager extends ZooKeeperListener { } // We need a lock on the region as we could update it - Lock lock = locker.acquireLock(encodedName); + PrimaryIdLock.Entry lockEntry = idLock.getLockEntry(encodedName); try { RegionState latestState = regionStates.getRegionTransitionState(encodedName); @@ -806,7 +806,7 @@ public class AssignmentManager extends ZooKeeperListener { throw new IllegalStateException("Received event is not valid."); } } finally { - lock.unlock(); + lockEntry.release(); } } @@ -927,7 +927,7 @@ public class AssignmentManager extends ZooKeeperListener { @Override public void run() { String regionName = ZKAssign.getRegionName(watcher, path); - Lock lock = locker.acquireLock(regionName); + PrimaryIdLock.Entry lockEntry = idLock.getLockEntry(regionName); try { RegionState rs = regionStates.getRegionTransitionState(regionName); if (rs == null) return; @@ -961,7 +961,7 @@ public class AssignmentManager extends ZooKeeperListener { } } } finally { - lock.unlock(); + lockEntry.release(); } } }); @@ -1179,18 +1179,33 @@ public class AssignmentManager extends ZooKeeperListener { return; } String encodedName = region.getEncodedName(); - Lock lock = locker.acquireLock(encodedName); + PrimaryIdLock.Entry lockEntry = idLock.getLockEntry(encodedName); try { RegionState state = forceRegionStateToOffline(region, forceNewPlan); if (state != null) { assign(state, setOfflineInZK, forceNewPlan); } } finally { - lock.unlock(); + lockEntry.release(); } } /** + * Acquire locks for a set of keys. The keys will be + * sorted internally to avoid possible deadlock. + */ + private Map getLockEntries(Set keys) { + SortedSet sortedKeys = new TreeSet(keys); + + Map lockEntries = new HashMap(); + for (String key : sortedKeys) { + lockEntries.put(key, idLock.getLockEntry(key)); + } + + return lockEntries; + } + + /** * Bulk assign regions to destination. * @param destination * @param regions Regions to assign. @@ -1211,7 +1226,7 @@ public class AssignmentManager extends ZooKeeperListener { } List failedToOpenRegions = new ArrayList(); - Map locks = locker.acquireLocks(encodedNames); + Map lockEntries = getLockEntries(encodedNames); try { AtomicInteger counter = new AtomicInteger(0); Map offlineNodesVersions = new ConcurrentHashMap(); @@ -1230,8 +1245,8 @@ public class AssignmentManager extends ZooKeeperListener { LOG.warn("failed to force region state to offline or " + "failed to set it offline in ZK, will reassign later: " + region); failedToOpenRegions.add(region); // assign individually later - Lock lock = locks.remove(encodedRegionName); - lock.unlock(); + PrimaryIdLock.Entry lockEntry = lockEntries.remove(encodedRegionName); + lockEntry.release(); } } @@ -1265,8 +1280,8 @@ public class AssignmentManager extends ZooKeeperListener { if (nodeVersion == null || nodeVersion.intValue() == -1) { LOG.warn("failed to offline in zookeeper: " + region); failedToOpenRegions.add(region); // assign individually later - Lock lock = locks.remove(encodedRegionName); - lock.unlock(); + PrimaryIdLock.Entry lockEntry = lockEntries.remove(encodedRegionName); + lockEntry.release(); } else { regionStates.updateRegionState(region, RegionState.State.PENDING_OPEN, destination); @@ -1348,8 +1363,8 @@ public class AssignmentManager extends ZooKeeperListener { throw new RuntimeException(e); } } finally { - for (Lock lock : locks.values()) { - lock.unlock(); + for (PrimaryIdLock.Entry lockEntry : lockEntries.values()) { + lockEntry.release(); } } @@ -1831,7 +1846,7 @@ public class AssignmentManager extends ZooKeeperListener { int versionOfClosingNode = -1; // We need a lock here as we're going to do a put later and we don't want multiple states // creation - ReentrantLock lock = locker.acquireLock(encodedName); + PrimaryIdLock.Entry lockEntry = idLock.getLockEntry(encodedName); RegionState state = regionStates.getRegionTransitionState(encodedName); try { if (state == null) { @@ -1897,7 +1912,7 @@ public class AssignmentManager extends ZooKeeperListener { unassign(region, state, versionOfClosingNode, dest, true); } finally { - lock.unlock(); + lockEntry.release(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java index 7890eb0..b547918 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,103 +19,115 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.io.InterruptedIOException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience; /** - * Allows multiple concurrent clients to lock on a numeric id with a minimal - * memory overhead. The intended usage is as follows: + * Allows multiple concurrent clients to lock on a numeric or non-null object id, + * with low contention except for waiting for a lock. + * This is reentrant. + * The intended usage is as follows: * - *
- * IdLock.Entry lockEntry = idLock.getLockEntry(id);
- * try {
- *   // User code.
- * } finally {
- *   idLock.releaseLockEntry(lockEntry);
- * }
+ *
     IdLock.Entry lockEntry = idLock.getLockEntry(id);
+ *     try {
+ *         // User code.
+ *     } finally {
+ *         lockEntry.release();
+ *     }
*/ @InterfaceAudience.Private public class IdLock { + private final PrimaryIdLock primaryIdLock = new PrimaryIdLock(); /** An entry returned to the client as a lock object */ public static class Entry { - private final long id; - private int numWaiters; - private boolean isLocked = true; + private final PrimaryIdLock.Entry primaryEntry; + + private Entry(PrimaryIdLock.Entry primaryEntry) { + this.primaryEntry = primaryEntry; + } - private Entry(long id) { - this.id = id; + /** + * Releases the corresponding lock. + * It is required to call with the same thread which you get this instance with. + * In practice it is appropriate to call this method in a finally block. + */ + public void release() { + primaryEntry.release(); } + @Override public String toString() { - return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked=" - + isLocked; + return "Entry[id=" + primaryEntry.getId() + + ", released=" + primaryEntry.isReleased() + "]"; } } - private ConcurrentMap map = - new ConcurrentHashMap(); - /** - * Blocks until the lock corresponding to the given id is acquired. + * Blocks until the lock corresponding to the the given {@code id} is acquired, + * and returns an entry to release the lock by {@link Entry#release()}; + * This is equivalent to the other overloaded method {@link #getLockEntry(Object)} + * with the wrapper object of the given {@code id}, + * which object is an instance of {@code Long}. + *

+ * Pay attention to which locks are same or not if you also call {@code getLockEntry(Object)}. + * Also calling with a {@code byte}, {@code char}, {@code short} or {@code int} parameter + * is usually bound to this method rather than {@code getLockEntry(Object)} with auto-boxing, + * and ids whose values are equal are regarded as equal, + * regardless of types of their actual parameters. * * @param id an arbitrary number to lock on - * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release - * the lock + * @return an "entry" to release the lock by {@link Entry#release()} * @throws IOException if interrupted + * @see #getLockEntry(Object) */ public Entry getLockEntry(long id) throws IOException { - Entry entry = new Entry(id); - Entry existing; - while ((existing = map.putIfAbsent(entry.id, entry)) != null) { - synchronized (existing) { - if (existing.isLocked) { - ++existing.numWaiters; // Add ourselves to waiters. - while (existing.isLocked) { - try { - existing.wait(); - } catch (InterruptedException e) { - --existing.numWaiters; // Remove ourselves from waiters. - throw new InterruptedIOException( - "Interrupted waiting to acquire sparse lock"); - } - } + return getLockEntry(Long.valueOf(id)); + } + + /** + * Blocks until the lock corresponding to the the given {@code id} is acquired, + * and returns an entry to release the lock by {@link Entry#release()}. + * + * @param id an arbitrary non-null object to lock on + * @return an "entry" to release the lock by {@link Entry#release()} + * @throws IOException if interrupted + * @throws NullPointerException if {@code id} is null + * @see #getLockEntry(long) + */ + public Entry getLockEntry(Object id) throws IOException { + PrimaryIdLock.Entry primaryEntry; - --existing.numWaiters; // Remove ourselves from waiters. - existing.isLocked = true; - return existing; - } - // If the entry is not locked, it might already be deleted from the - // map, so we cannot return it. We need to get our entry into the map - // or get someone else's locked entry. - } + try { + primaryEntry = primaryIdLock.getLockEntryInterruptibly(id); + + } catch (InterruptedException e) { + throw new InterruptedIOException( + "Interrupted waiting to acquire sparse lock"); } - return entry; + + return new Entry(primaryEntry); } /** - * Must be called in a finally block to decrease the internal counter and - * remove the monitor object for the given id if the caller is the last - * client. + * Releases the corresponding lock. + * It is required to call with the same thread which you get the {@code entry} with. + * In practice it is appropriate to call this method in a finally block. * - * @param entry the return value of {@link #getLockEntry(long)} + * @param entry the return value of {@link #getLockEntry(long)} or {@link #getLockEntry(Object)} + * @deprecated Replaced by {@code entry.release()}. */ + @Deprecated public void releaseLockEntry(Entry entry) { - synchronized (entry) { - entry.isLocked = false; - if (entry.numWaiters > 0) { - entry.notify(); - } else { - map.remove(entry.id); - } - } + entry.release(); } - /** For testing */ - void assertMapEmpty() { - assert map.size() == 0; + /** + * For testing; + * this method is expected to return true + * only if the internal map to hold locks keeps empty while invoking this method. + */ + boolean isEmpty() { + return primaryIdLock.isEmpty(); } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java index c753f33..93e5d53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java @@ -47,7 +47,9 @@ import org.apache.commons.logging.LogFactory; * } * } *

+ * @deprecated Use {@link PrimaryIdLock} instead. */ +@Deprecated public class KeyLocker> { private static final Log LOG = LogFactory.getLog(KeyLocker.class); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/PrimaryIdLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/PrimaryIdLock.java new file mode 100644 index 0000000..88d11fa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/PrimaryIdLock.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Allows multiple concurrent clients to lock on a non-null object id, + * with low contention except for waiting for a lock. + * This is reentrant. + * The intended usage is as follows: + * + *
     PrimaryIdLock.Entry lockEntry = idLock.getLockEntry(id);
+ *     try {
+ *         // User code.
+ *     } finally {
+ *         lockEntry.release();
+ *     }
+ */ +@InterfaceAudience.Private +public class PrimaryIdLock { + /** + * The count starts at 1, and once the count turns down to 0, + * this instance is regarded as disabled and never becomes enabled. + * This irreversibility is expected to use to keep an unused object from reuse, + * and remove the unused object from a collection without blocking threads. + */ + private static class ParticipationCount { + final AtomicLong countRef = new AtomicLong(1); + + boolean incrementAndIsEnabled() { + while (true) { + long count = countRef.get(); + if (count == 0) { + return false; + } + + long nextCount = count + 1; + + if (nextCount == 0) { + throw new Error("Maximum participation count exceeded"); + } + + if (countRef.compareAndSet(count, nextCount)) { + return true; + } + } + } + + boolean decrementAndIsEnabled() { + while (true) { + long count = countRef.get(); + if (count == 0) { + return false; + } + + long nextCount = count - 1; + + if (countRef.compareAndSet(count, nextCount)) { + return nextCount != 0; + } + } + } + } + + /** + * Increment the participation count before participating in the lock, + * and decrement the count after leaving the lock. + * In short, the {@code participationCount} is a sort of reservation count for the {@code lock}, + * and if once the {@code participationCount} is disabled + * the {@code lock} is ensured not to be used anymore. + */ + private static class ExclusiveControl { + final ParticipationCount participationCount = new ParticipationCount(); + final ReentrantLock lock = new ReentrantLock(); + } + + /** An entry returned to the client as a lock object */ + public class Entry { + private final Object id; + private final ExclusiveControl control; + private final AtomicBoolean released = new AtomicBoolean(); + + private Entry(Object id, ExclusiveControl control) { + this.id = id; + this.control = control; + } + + /** + * Releases the corresponding lock. + * It is required to call with the same thread which you get this instance with. + * In practice it is appropriate to call this method in a finally block. + */ + public void release() { + if (control.lock.isHeldByCurrentThread() && released.compareAndSet(false, true)) { + control.lock.unlock(); + returnControl(id, control); + } + } + + /** + * Returns id related to this lock-entry. + */ + public Object getId() { + return id; + } + + /** + * Returns true if this lock-entry is already released. + */ + public boolean isReleased() { + return released.get(); + } + + @Override + public String toString() { + return "Entry[id=" + getId() + ", released=" + isReleased() + "]"; + } + } + + private final ConcurrentMap controlMap = + new ConcurrentHashMap(); + + private ExclusiveControl prepareControl(Object id) { + // Calling ConcurrentHashMap.get() before putIfAbsent() is an idiom for performance; + // get() generally doesn't block in contrast to putIfAbsent(). + + ExclusiveControl control = controlMap.get(id); + if (control != null) { + if (control.participationCount.incrementAndIsEnabled()) { + return control; + } + + controlMap.remove(id, control); + } + + final ExclusiveControl newControl = new ExclusiveControl(); + + while (true) { + control = controlMap.putIfAbsent(id, newControl); + if (control == null) { + return newControl; + } + + if (control.participationCount.incrementAndIsEnabled()) { + return control; + } + + controlMap.remove(id, control); + } + } + + private void returnControl(Object id, ExclusiveControl control) { + if (! control.participationCount.decrementAndIsEnabled()) { + controlMap.remove(id, control); + } + } + + /** + * Blocks interruptibly until the lock corresponding to the the given {@code id} is acquired, + * and returns an entry to release the lock by {@link Entry#release()}. + * + * @param id an arbitrary non-null object to lock on + * @return an "entry" to release the lock by {@link Entry#release()} + * @throws InterruptedException if interrupted + * @throws NullPointerException if {@code id} is null + * @see #getLockEntry(Object) + */ + public Entry getLockEntryInterruptibly(Object id) throws InterruptedException { + if(id == null) { + throw new NullPointerException(); + } + + ExclusiveControl control = prepareControl(id); + + try { + control.lock.lockInterruptibly(); + + } catch (InterruptedException e) { + returnControl(id, control); + throw e; + } + + return new Entry(id, control); + } + + /** + * Blocks uninterruptibly until the lock corresponding to the the given {@code id} is acquired, + * and returns an entry to release the lock by {@link Entry#release()}. + * + * @param id an arbitrary non-null object to lock on + * @return an "entry" to release the lock by {@link Entry#release()} + * @throws NullPointerException if {@code id} is null + * @see #getLockEntryInterruptibly(Object) + */ + public Entry getLockEntry(Object id) { + if(id == null) { + throw new NullPointerException(); + } + + ExclusiveControl control = prepareControl(id); + control.lock.lock(); + return new Entry(id, control); + } + + /** + * For testing; According to the implementation of {@link ConcurrentHashMap#isEmpty()}, + * this method is expected to return true + * only if the map keeps empty while invoking this method. + */ + boolean isEmpty() { + return controlMap.isEmpty(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java index bbf4bba..74d18ad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,98 +15,310 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.util; -import java.util.Map; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import static org.junit.Assert.*; +import java.util.concurrent.FutureTask; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hbase.MediumTests; import org.junit.Test; import org.junit.experimental.categories.Category; @Category(MediumTests.class) -// Medium as it creates 100 threads; seems better to run it isolated +// Medium as it creates many threads; seems better to run it isolated public class TestIdLock { + @Test(timeout=10000) + public void testBlockForEqualIds() throws Exception { + final IdLock idLock = new IdLock(); - private static final Log LOG = LogFactory.getLog(TestIdLock.class); + IdLock.Entry entry = idLock.getLockEntry(0); - private static final int NUM_IDS = 16; - private static final int NUM_THREADS = 128; - private static final int NUM_SECONDS = 15; + final AtomicBoolean unlocked = new AtomicBoolean(); - private IdLock idLock = new IdLock(); + class LockAndReleaseTask implements Callable { + @Override + public Void call() throws Exception { + IdLock.Entry entry = idLock.getLockEntry(0); + try { + unlocked.set(true); + } finally { + entry.release(); + } + return null; + } + } - private Map idOwner = new ConcurrentHashMap(); + FutureTask futureTask = new FutureTask(new LockAndReleaseTask()); + new Thread(futureTask).start(); - private class IdLockTestThread implements Callable { + Thread.sleep(100); + assertFalse(unlocked.get()); - private String clientId; + entry.release(); + Thread.sleep(100); + assertTrue(unlocked.get()); - public IdLockTestThread(String clientId) { - this.clientId = clientId; - } + futureTask.get(); + assertTrue(idLock.isEmpty()); + } - @Override - public Boolean call() throws Exception { - Thread.currentThread().setName(clientId); - Random rand = new Random(); - long endTime = System.currentTimeMillis() + NUM_SECONDS * 1000; - while (System.currentTimeMillis() < endTime) { - long id = rand.nextInt(NUM_IDS); + @Test(timeout=10000) + public void testNotBlockForDifferentIds() throws Exception { + final IdLock idLock = new IdLock(); - IdLock.Entry lockEntry = idLock.getLockEntry(id); + IdLock.Entry entry = idLock.getLockEntry(0); + + final AtomicBoolean unlocked = new AtomicBoolean(); + + class LockAndReleaseTask implements Callable { + @Override + public Void call() throws Exception { + IdLock.Entry entry = idLock.getLockEntry(1); try { - int sleepMs = 1 + rand.nextInt(4); - String owner = idOwner.get(id); - if (owner != null) { - LOG.error("Id " + id + " already taken by " + owner + ", " - + clientId + " failed"); - return false; - } + unlocked.set(true); + } finally { + entry.release(); + } + return null; + } + } + + FutureTask futureTask = new FutureTask(new LockAndReleaseTask()); + new Thread(futureTask).start(); + + Thread.sleep(100); + assertTrue(unlocked.get()); + + entry.release(); + Thread.sleep(100); + + futureTask.get(); + assertTrue(idLock.isEmpty()); + } + + @Test(timeout=10000) + public void testUnblockOneByOne() throws Exception { + final int threadCount = 10; + + final IdLock idLock = new IdLock(); - idOwner.put(id, clientId); - Thread.sleep(sleepMs); - idOwner.remove(id); + IdLock.Entry entry = idLock.getLockEntry(0); + final AtomicBoolean unlocked = new AtomicBoolean(); + + class LockAndReleaseTask implements Callable { + @Override + public Void call() throws Exception { + IdLock.Entry entry = idLock.getLockEntry(0); + try { + assertFalse(unlocked.get()); + unlocked.set(true); + Thread.sleep(100); + unlocked.set(false); } finally { - idLock.releaseLockEntry(lockEntry); + entry.release(); } + return null; } - return true; } + List> futureTasks = new ArrayList>(); + for (int i = 0; i < threadCount; i++) { + futureTasks.add(new FutureTask(new LockAndReleaseTask())); + } + + for (FutureTask task : futureTasks) { + new Thread(task).start(); + } + + entry.release(); + + for (FutureTask task : futureTasks) { + task.get(); + } + assertTrue(idLock.isEmpty()); } - @Test - public void testMultipleClients() throws Exception { - ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS); - try { - ExecutorCompletionService ecs = - new ExecutorCompletionService(exec); - for (int i = 0; i < NUM_THREADS; ++i) - ecs.submit(new IdLockTestThread("client_" + i)); - for (int i = 0; i < NUM_THREADS; ++i) { - Future result = ecs.take(); - assertTrue(result.get()); + @Test(timeout=10000) + public void testCompetitionBetweenReleaseAndLock() throws Exception { + final int loopCount = 10000; + + final IdLock idLock = new IdLock(); + final CyclicBarrier readyBarrier = new CyclicBarrier(2); + final CyclicBarrier nextBarrier = new CyclicBarrier(2); + + class ReleaseTask implements Callable { + @Override + public Void call() throws Exception { + for (int i = 0; i < loopCount; i++) { + IdLock.Entry entry = idLock.getLockEntry(0); + readyBarrier.await(); + entry.release(); + nextBarrier.await(); + } + return null; } - idLock.assertMapEmpty(); - } finally { - exec.shutdown(); } + + class BlockTask implements Callable { + @Override + public Void call() throws Exception { + for (int i = 0; i < loopCount; i++) { + readyBarrier.await(); + IdLock.Entry entry = idLock.getLockEntry(0); + entry.release(); + nextBarrier.await(); + } + return null; + } + } + + FutureTask releaseFutureTask = new FutureTask(new ReleaseTask()); + new Thread(releaseFutureTask).start(); + + FutureTask blockFutureTask = new FutureTask(new BlockTask()); + new Thread(blockFutureTask).start(); + + releaseFutureTask.get(); + blockFutureTask.get(); + assertTrue(idLock.isEmpty()); } + @Test(timeout=10000) + public void testCompetitionBetweenLocks() throws Exception { + final int loopCount = 10000; + final int threadCount = 10; -} + final IdLock idLock = new IdLock(); + final CyclicBarrier readyBarrier = new CyclicBarrier(threadCount); + final CyclicBarrier nextBarrier = new CyclicBarrier(threadCount); + + class BlockTask implements Callable { + @Override + public Void call() throws Exception { + for (int i = 0; i < loopCount; i++) { + readyBarrier.await(); + IdLock.Entry entry = idLock.getLockEntry(0); + entry.release(); + nextBarrier.await(); + } + return null; + } + } + + List> blockFutureTasks = new ArrayList>(); + for (int i = 0; i < threadCount; i++) { + blockFutureTasks.add(new FutureTask(new BlockTask())); + } + + for (FutureTask task : blockFutureTasks) { + new Thread(task).start(); + } + + for (FutureTask task : blockFutureTasks) { + task.get(); + } + assertTrue(idLock.isEmpty()); + } + + private static class IdLockIntensiveTestTaskProvider { + final IdLock idLock; + final int idCount; + final long executionIntervalMillis; + + final ConcurrentMap idOwner = new ConcurrentHashMap(); + final Random rand = new Random(); + + IdLockIntensiveTestTaskProvider(IdLock idLock, int idCount, long executionIntervalMillis) { + this.idLock = idLock; + this.idCount = idCount; + this.executionIntervalMillis = executionIntervalMillis; + } + + int nextTaskId; + + Callable nextTask() { + return new IdLockIntensiveTestTask("task_" + nextTaskId++); + } + + class IdLockIntensiveTestTask implements Callable { + final String taskName; + + IdLockIntensiveTestTask(String taskName) { + this.taskName = taskName; + } + + @Override + public String call() throws Exception { + long endTime = System.currentTimeMillis() + executionIntervalMillis; + while (System.currentTimeMillis() < endTime) { + long id = rand.nextInt(idCount); + IdLock.Entry lockEntry = idLock.getLockEntry(id); + try { + String existingTaskName = idOwner.putIfAbsent(id, taskName); + if (existingTaskName != null) { + return "Id " + id + " already taken by " + existingTaskName + ", " + + taskName + " failed"; + } + + int sleepMs = 1 + rand.nextInt(4); + Thread.sleep(sleepMs); + idOwner.remove(id, taskName); + + } finally { + lockEntry.release(); + } + } + return null; + } + } + } + + @Test(timeout=60000) + public void testIntensively() throws Exception { + final int idCount = 16; + final int executionIntervalMillis = 15 * 1000; + final int threadCount = 128; + + IdLock idLock = new IdLock(); + + IdLockIntensiveTestTaskProvider taskProvider = + new IdLockIntensiveTestTaskProvider(idLock, idCount, executionIntervalMillis); + + ExecutorService service = Executors.newFixedThreadPool(threadCount); + try { + ExecutorCompletionService completionService = + new ExecutorCompletionService(service); + + for (int i = 0; i < threadCount; i++) { + completionService.submit(taskProvider.nextTask()); + } + + for (int i = 0; i < threadCount; i++) { + Future result = completionService.take(); + assertNull(result.get()); + } + + } finally { + service.shutdown(); + } + + assertTrue(idLock.isEmpty()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestPrimaryIdLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestPrimaryIdLock.java new file mode 100644 index 0000000..3949d2d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestPrimaryIdLock.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hbase.MediumTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +// Medium as it creates many threads; seems better to run it isolated +public class TestPrimaryIdLock { + @Test(timeout=10000) + public void testBlockForEqualIds() throws Exception { + final PrimaryIdLock idLock = new PrimaryIdLock(); + + PrimaryIdLock.Entry entry = idLock.getLockEntryInterruptibly(0); + + final AtomicBoolean unlocked = new AtomicBoolean(); + + class LockAndReleaseTask implements Callable { + @Override + public Void call() throws Exception { + PrimaryIdLock.Entry entry = idLock.getLockEntryInterruptibly(0); + try { + unlocked.set(true); + } finally { + entry.release(); + } + return null; + } + } + + FutureTask futureTask = new FutureTask(new LockAndReleaseTask()); + new Thread(futureTask).start(); + + Thread.sleep(100); + assertFalse(unlocked.get()); + + entry.release(); + Thread.sleep(100); + assertTrue(unlocked.get()); + + futureTask.get(); + assertTrue(idLock.isEmpty()); + } + + @Test(timeout=10000) + public void testNotBlockForDifferentIds() throws Exception { + final PrimaryIdLock idLock = new PrimaryIdLock(); + + PrimaryIdLock.Entry entry = idLock.getLockEntryInterruptibly(0); + + final AtomicBoolean unlocked = new AtomicBoolean(); + + class LockAndReleaseTask implements Callable { + @Override + public Void call() throws Exception { + PrimaryIdLock.Entry entry = idLock.getLockEntryInterruptibly(1); + try { + unlocked.set(true); + } finally { + entry.release(); + } + return null; + } + } + + FutureTask futureTask = new FutureTask(new LockAndReleaseTask()); + new Thread(futureTask).start(); + + Thread.sleep(100); + assertTrue(unlocked.get()); + + entry.release(); + Thread.sleep(100); + + futureTask.get(); + assertTrue(idLock.isEmpty()); + } + + @Test(timeout=10000) + public void testInterruptBlock() throws Exception { + final PrimaryIdLock idLock = new PrimaryIdLock(); + + PrimaryIdLock.Entry entry = idLock.getLockEntryInterruptibly(0); + + class LockAndReleaseTask implements Callable { + @Override + public Void call() throws Exception { + idLock.getLockEntryInterruptibly(0); + return null; + } + } + + FutureTask futureTask = new FutureTask(new LockAndReleaseTask()); + Thread thread = new Thread(futureTask); + thread.start(); + + Thread.sleep(100); + thread.interrupt(); + + try { + futureTask.get(); + Assert.fail(); + } catch(ExecutionException e) { + assertTrue(e.getCause() instanceof InterruptedException); + } + + entry.release(); + assertTrue(idLock.isEmpty()); + } + + @Test(timeout=10000) + public void testUninterruptableBlock() throws Exception { + final PrimaryIdLock idLock = new PrimaryIdLock(); + + PrimaryIdLock.Entry entry = idLock.getLockEntryInterruptibly(0); + + final AtomicBoolean unlocked = new AtomicBoolean(); + + class LockAndReleaseTask implements Callable { + @Override + public Void call() throws Exception { + PrimaryIdLock.Entry entry = idLock.getLockEntry(0); + try { + unlocked.set(true); + } finally { + entry.release(); + } + return null; + } + } + + FutureTask futureTask = new FutureTask(new LockAndReleaseTask()); + Thread thread = new Thread(futureTask); + thread.start(); + + Thread.sleep(100); + thread.interrupt(); + + try { + futureTask.get(100, TimeUnit.MICROSECONDS); + Assert.fail(); + } catch(TimeoutException e) { + // expected + } + + assertFalse(unlocked.get()); + + entry.release(); + Thread.sleep(100); + assertTrue(unlocked.get()); + + futureTask.get(); + assertTrue(idLock.isEmpty()); + } + + @Test(timeout=10000) + public void testUnblockOneByOne() throws Exception { + final int threadCount = 10; + + final PrimaryIdLock idLock = new PrimaryIdLock(); + + PrimaryIdLock.Entry entry = idLock.getLockEntryInterruptibly(0); + + final AtomicBoolean unlocked = new AtomicBoolean(); + + class LockAndReleaseTask implements Callable { + @Override + public Void call() throws Exception { + PrimaryIdLock.Entry entry = idLock.getLockEntryInterruptibly(0); + try { + assertFalse(unlocked.get()); + unlocked.set(true); + Thread.sleep(100); + unlocked.set(false); + } finally { + entry.release(); + } + return null; + } + } + + List> futureTasks = new ArrayList>(); + for (int i = 0; i < threadCount; i++) { + futureTasks.add(new FutureTask(new LockAndReleaseTask())); + } + + for (FutureTask task : futureTasks) { + new Thread(task).start(); + } + + entry.release(); + + for (FutureTask task : futureTasks) { + task.get(); + } + assertTrue(idLock.isEmpty()); + } + + @Test(timeout=10000) + public void testCompetitionBetweenReleaseAndLock() throws Exception { + final int loopCount = 10000; + + final PrimaryIdLock idLock = new PrimaryIdLock(); + final CyclicBarrier readyBarrier = new CyclicBarrier(2); + final CyclicBarrier nextBarrier = new CyclicBarrier(2); + + class ReleaseTask implements Callable { + @Override + public Void call() throws Exception { + for (int i = 0; i < loopCount; i++) { + PrimaryIdLock.Entry entry = idLock.getLockEntryInterruptibly(0); + readyBarrier.await(); + entry.release(); + nextBarrier.await(); + } + return null; + } + } + + class BlockTask implements Callable { + @Override + public Void call() throws Exception { + for (int i = 0; i < loopCount; i++) { + readyBarrier.await(); + PrimaryIdLock.Entry entry = idLock.getLockEntryInterruptibly(0); + entry.release(); + nextBarrier.await(); + } + return null; + } + } + + FutureTask releaseFutureTask = new FutureTask(new ReleaseTask()); + new Thread(releaseFutureTask).start(); + + FutureTask blockFutureTask = new FutureTask(new BlockTask()); + new Thread(blockFutureTask).start(); + + releaseFutureTask.get(); + blockFutureTask.get(); + assertTrue(idLock.isEmpty()); + } + + @Test(timeout=10000) + public void testCompetitionBetweenLocks() throws Exception { + final int loopCount = 10000; + final int threadCount = 10; + + final PrimaryIdLock idLock = new PrimaryIdLock(); + final CyclicBarrier readyBarrier = new CyclicBarrier(threadCount); + final CyclicBarrier nextBarrier = new CyclicBarrier(threadCount); + + class BlockTask implements Callable { + @Override + public Void call() throws Exception { + for (int i = 0; i < loopCount; i++) { + readyBarrier.await(); + PrimaryIdLock.Entry entry = idLock.getLockEntryInterruptibly(0); + entry.release(); + nextBarrier.await(); + } + return null; + } + } + + List> blockFutureTasks = new ArrayList>(); + for (int i = 0; i < threadCount; i++) { + blockFutureTasks.add(new FutureTask(new BlockTask())); + } + + for (FutureTask task : blockFutureTasks) { + new Thread(task).start(); + } + + for (FutureTask task : blockFutureTasks) { + task.get(); + } + assertTrue(idLock.isEmpty()); + } + + private static class IdLockIntensiveTestTaskProvider { + final PrimaryIdLock idLock; + final int idCount; + final long executionIntervalMillis; + + final ConcurrentMap idOwner = new ConcurrentHashMap(); + final Random rand = new Random(); + + IdLockIntensiveTestTaskProvider(PrimaryIdLock idLock, int idCount, long executionIntervalMillis) { + this.idLock = idLock; + this.idCount = idCount; + this.executionIntervalMillis = executionIntervalMillis; + } + + int nextTaskId; + + Callable nextTask() { + return new IdLockIntensiveTestTask("task_" + nextTaskId++); + } + + class IdLockIntensiveTestTask implements Callable { + final String taskName; + + IdLockIntensiveTestTask(String taskName) { + this.taskName = taskName; + } + + @Override + public String call() throws Exception { + long endTime = System.currentTimeMillis() + executionIntervalMillis; + while (System.currentTimeMillis() < endTime) { + long id = rand.nextInt(idCount); + + PrimaryIdLock.Entry lockEntry = idLock.getLockEntryInterruptibly(id); + try { + String existingTaskName = idOwner.putIfAbsent(id, taskName); + if (existingTaskName != null) { + return "Id " + id + " already taken by " + existingTaskName + ", " + + taskName + " failed"; + } + + int sleepMs = 1 + rand.nextInt(4); + Thread.sleep(sleepMs); + idOwner.remove(id, taskName); + + } finally { + lockEntry.release(); + } + } + return null; + } + } + } + + @Test(timeout=60000) + public void testIntensively() throws Exception { + final int idCount = 16; + final int executionIntervalMillis = 15 * 1000; + final int threadCount = 128; + + PrimaryIdLock idLock = new PrimaryIdLock(); + + IdLockIntensiveTestTaskProvider taskProvider = + new IdLockIntensiveTestTaskProvider(idLock, idCount, executionIntervalMillis); + + ExecutorService service = Executors.newFixedThreadPool(threadCount); + try { + ExecutorCompletionService completionService = + new ExecutorCompletionService(service); + + for (int i = 0; i < threadCount; i++) { + completionService.submit(taskProvider.nextTask()); + } + + for (int i = 0; i < threadCount; i++) { + Future result = completionService.take(); + assertNull(result.get()); + } + + } finally { + service.shutdown(); + } + + assertTrue(idLock.isEmpty()); + } +}