();
+ private void returnControl(Object id, ExclusiveControl control) {
+ if (! control.participationCount.decrementAndIsEnabled()) {
+ controlMap.remove(id, control);
+ }
+ }
/**
- * Blocks until the lock corresponding to the given id is acquired.
+ * Blocks until the lock corresponding to the the given {@code id} is acquired,
+ * and returns an entry to release the lock by {@link Entry#release()};
+ * This is equivalent to the other overloaded method {@link #getLockEntry(Object)}
+ * with the wrapper object of the given {@code id},
+ * which object is an instance of {@code Long}.
+ *
+ * Pay attention to which locks are same or not if you also call {@code getLockEntry(Object)}.
+ * Also calling with a {@code byte}, {@code char}, {@code short} or {@code int} parameter
+ * is usually bound to this method rather than {@code getLockEntry(Object)} with auto-boxing,
+ * and ids whose values are equal are regarded as equal,
+ * regardless of types of their actual parameters.
*
* @param id an arbitrary number to lock on
- * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
- * the lock
+ * @return an "entry" to release the lock by {@link Entry#release()}
* @throws IOException if interrupted
+ * @see #getLockEntry(Object)
*/
public Entry getLockEntry(long id) throws IOException {
- Entry entry = new Entry(id);
- Entry existing;
- while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
- synchronized (existing) {
- if (existing.isLocked) {
- ++existing.numWaiters; // Add ourselves to waiters.
- while (existing.isLocked) {
- try {
- existing.wait();
- } catch (InterruptedException e) {
- --existing.numWaiters; // Remove ourselves from waiters.
- throw new InterruptedIOException(
- "Interrupted waiting to acquire sparse lock");
- }
- }
-
- --existing.numWaiters; // Remove ourselves from waiters.
- existing.isLocked = true;
- return existing;
- }
- // If the entry is not locked, it might already be deleted from the
- // map, so we cannot return it. We need to get our entry into the map
- // or get someone else's locked entry.
- }
- }
- return entry;
+ return getLockEntry(Long.valueOf(id));
}
/**
- * Must be called in a finally block to decrease the internal counter and
- * remove the monitor object for the given id if the caller is the last
- * client.
+ * Blocks until the lock corresponding to the the given {@code id} is acquired,
+ * and returns an entry to release the lock by {@link Entry#release()}.
*
- * @param entry the return value of {@link #getLockEntry(long)}
+ * @param id an arbitrary non-null object to lock on
+ * @return an "entry" to release the lock by {@link Entry#release()}
+ * @throws IOException if interrupted
+ * @throws NullPointerException if {@code id} is null
+ * @see #getLockEntry(long)
*/
- public void releaseLockEntry(Entry entry) {
- synchronized (entry) {
- entry.isLocked = false;
- if (entry.numWaiters > 0) {
- entry.notify();
- } else {
- map.remove(entry.id);
- }
+ public Entry getLockEntry(Object id) throws IOException {
+ if(id == null) {
+ throw new NullPointerException();
+ }
+
+ ExclusiveControl control = prepareControl(id);
+
+ try {
+ control.lock.lockInterruptibly();
+
+ } catch (InterruptedException e) {
+ returnControl(id, control);
+ throw new InterruptedIOException(
+ "Interrupted waiting to acquire sparse lock");
}
+
+ return new Entry(id, control);
}
- /** For testing */
- void assertMapEmpty() {
- assert map.size() == 0;
+ /**
+ * Releases the corresponding lock.
+ * It is required to call with the same thread which you get the {@code entry} with.
+ * In practice it is appropriate to call this method in a finally block.
+ *
+ * @param entry the return value of {@link #getLockEntry(long)} or {@link #getLockEntry(Object)}
+ * @deprecated Replaced by {@code entry.release()}.
+ */
+ @Deprecated
+ public void releaseLockEntry(Entry entry) {
+ entry.release();
}
+ /**
+ * For testing; According to the implementation of {@link ConcurrentHashMap#isEmpty()},
+ * this method is expected to return true
+ * only if the map keeps empty while invoking this method.
+ */
+ boolean isEmpty() {
+ return controlMap.isEmpty();
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java
index bbf4bba..fb72019 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdLock.java
@@ -19,95 +19,308 @@
package org.apache.hadoop.hbase.util;
-import java.util.Map;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import static org.junit.Assert.*;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.MediumTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
-// Medium as it creates 100 threads; seems better to run it isolated
+// Medium as it creates many threads; seems better to run it isolated
public class TestIdLock {
+ @Test(timeout=10000)
+ public void testBlockForEqualIds() throws Exception {
+ final IdLock idLock = new IdLock();
- private static final Log LOG = LogFactory.getLog(TestIdLock.class);
+ IdLock.Entry entry = idLock.getLockEntry(0);
+
+ final AtomicBoolean unlocked = new AtomicBoolean();
+
+ class LockAndReleaseTask implements Callable {
+ @Override
+ public Void call() throws Exception {
+ IdLock.Entry entry = idLock.getLockEntry(0);
+ try {
+ unlocked.set(true);
+ } finally {
+ entry.release();
+ }
+ return null;
+ }
+ }
- private static final int NUM_IDS = 16;
- private static final int NUM_THREADS = 128;
- private static final int NUM_SECONDS = 15;
+ FutureTask futureTask = new FutureTask(new LockAndReleaseTask());
+ new Thread(futureTask).start();
- private IdLock idLock = new IdLock();
+ Thread.sleep(100);
+ assertFalse(unlocked.get());
- private Map idOwner = new ConcurrentHashMap();
+ entry.release();
+ Thread.sleep(100);
+ assertTrue(unlocked.get());
- private class IdLockTestThread implements Callable {
+ futureTask.get();
+ assertTrue(idLock.isEmpty());
+ }
- private String clientId;
+ @Test(timeout=10000)
+ public void testNotBlockForDifferentIds() throws Exception {
+ final IdLock idLock = new IdLock();
- public IdLockTestThread(String clientId) {
- this.clientId = clientId;
- }
+ IdLock.Entry entry = idLock.getLockEntry(0);
- @Override
- public Boolean call() throws Exception {
- Thread.currentThread().setName(clientId);
- Random rand = new Random();
- long endTime = System.currentTimeMillis() + NUM_SECONDS * 1000;
- while (System.currentTimeMillis() < endTime) {
- long id = rand.nextInt(NUM_IDS);
+ final AtomicBoolean unlocked = new AtomicBoolean();
- IdLock.Entry lockEntry = idLock.getLockEntry(id);
+ class LockAndReleaseTask implements Callable {
+ @Override
+ public Void call() throws Exception {
+ IdLock.Entry entry = idLock.getLockEntry(1);
try {
- int sleepMs = 1 + rand.nextInt(4);
- String owner = idOwner.get(id);
- if (owner != null) {
- LOG.error("Id " + id + " already taken by " + owner + ", "
- + clientId + " failed");
- return false;
- }
+ unlocked.set(true);
+ } finally {
+ entry.release();
+ }
+ return null;
+ }
+ }
+
+ FutureTask futureTask = new FutureTask(new LockAndReleaseTask());
+ new Thread(futureTask).start();
+
+ Thread.sleep(100);
+ assertTrue(unlocked.get());
+
+ entry.release();
+ Thread.sleep(100);
+
+ futureTask.get();
+ assertTrue(idLock.isEmpty());
+ }
+
+ @Test(timeout=10000)
+ public void testUnblockOneByOne() throws Exception {
+ final int threadCount = 10;
- idOwner.put(id, clientId);
- Thread.sleep(sleepMs);
- idOwner.remove(id);
+ final IdLock idLock = new IdLock();
+ IdLock.Entry entry = idLock.getLockEntry(0);
+
+ final AtomicBoolean unlocked = new AtomicBoolean();
+
+ class LockAndReleaseTask implements Callable {
+ @Override
+ public Void call() throws Exception {
+ IdLock.Entry entry = idLock.getLockEntry(0);
+ try {
+ assertFalse(unlocked.get());
+ unlocked.set(true);
+ Thread.sleep(100);
+ unlocked.set(false);
} finally {
- idLock.releaseLockEntry(lockEntry);
+ entry.release();
}
+ return null;
}
- return true;
}
+ List> futureTasks = new ArrayList>();
+ for (int i = 0; i < threadCount; i++) {
+ futureTasks.add(new FutureTask(new LockAndReleaseTask()));
+ }
+
+ for (FutureTask task : futureTasks) {
+ new Thread(task).start();
+ }
+
+ entry.release();
+
+ for (FutureTask task : futureTasks) {
+ task.get();
+ }
+ assertTrue(idLock.isEmpty());
}
- @Test
- public void testMultipleClients() throws Exception {
- ExecutorService exec = Executors.newFixedThreadPool(NUM_THREADS);
- try {
- ExecutorCompletionService ecs =
- new ExecutorCompletionService(exec);
- for (int i = 0; i < NUM_THREADS; ++i)
- ecs.submit(new IdLockTestThread("client_" + i));
- for (int i = 0; i < NUM_THREADS; ++i) {
- Future result = ecs.take();
- assertTrue(result.get());
+ @Test(timeout=10000)
+ public void testCompetitionBetweenReleaseAndLock() throws Exception {
+ final int loopCount = 10000;
+
+ final IdLock idLock = new IdLock();
+ final CyclicBarrier readyBarrier = new CyclicBarrier(2);
+ final CyclicBarrier nextBarrier = new CyclicBarrier(2);
+
+ class ReleaseTask implements Callable {
+ @Override
+ public Void call() throws Exception {
+ for (int i = 0; i < loopCount; i++) {
+ IdLock.Entry entry = idLock.getLockEntry(0);
+ readyBarrier.await();
+ entry.release();
+ nextBarrier.await();
+ }
+ return null;
}
- idLock.assertMapEmpty();
- } finally {
- exec.shutdown();
}
+
+ class BlockTask implements Callable {
+ @Override
+ public Void call() throws Exception {
+ for (int i = 0; i < loopCount; i++) {
+ readyBarrier.await();
+ IdLock.Entry entry = idLock.getLockEntry(0);
+ entry.release();
+ nextBarrier.await();
+ }
+ return null;
+ }
+ }
+
+ FutureTask releaseFutureTask = new FutureTask(new ReleaseTask());
+ new Thread(releaseFutureTask).start();
+
+ FutureTask blockFutureTask = new FutureTask(new BlockTask());
+ new Thread(blockFutureTask).start();
+
+ releaseFutureTask.get();
+ blockFutureTask.get();
+ assertTrue(idLock.isEmpty());
}
+ @Test(timeout=10000)
+ public void testCompetitionBetweenLocks() throws Exception {
+ final int loopCount = 10000;
+ final int threadCount = 10;
-}
+ final IdLock idLock = new IdLock();
+ final CyclicBarrier readyBarrier = new CyclicBarrier(threadCount);
+ final CyclicBarrier nextBarrier = new CyclicBarrier(threadCount);
+ class BlockTask implements Callable {
+ @Override
+ public Void call() throws Exception {
+ for (int i = 0; i < loopCount; i++) {
+ readyBarrier.await();
+ IdLock.Entry entry = idLock.getLockEntry(0);
+ entry.release();
+ nextBarrier.await();
+ }
+ return null;
+ }
+ }
+
+ List> blockFutureTasks = new ArrayList>();
+ for (int i = 0; i < threadCount; i++) {
+ blockFutureTasks.add(new FutureTask(new BlockTask()));
+ }
+
+ for (FutureTask task : blockFutureTasks) {
+ new Thread(task).start();
+ }
+
+ for (FutureTask task : blockFutureTasks) {
+ task.get();
+ }
+ assertTrue(idLock.isEmpty());
+ }
+
+ private static class IdLockIntensiveTestTaskProvider {
+ final IdLock idLock;
+ final int idCount;
+ final long executionIntervalMillis;
+
+ final ConcurrentMap idOwner = new ConcurrentHashMap();
+ final Random rand = new Random();
+
+ IdLockIntensiveTestTaskProvider(IdLock idLock, int idCount, long executionIntervalMillis) {
+ this.idLock = idLock;
+ this.idCount = idCount;
+ this.executionIntervalMillis = executionIntervalMillis;
+ }
+
+ int nextTaskId;
+
+ Callable nextTask() {
+ return new IdLockIntensiveTestTask("task_" + nextTaskId++);
+ }
+
+ class IdLockIntensiveTestTask implements Callable {
+ final String taskName;
+
+ IdLockIntensiveTestTask(String taskName) {
+ this.taskName = taskName;
+ }
+
+ @Override
+ public String call() throws Exception {
+ long endTime = System.currentTimeMillis() + executionIntervalMillis;
+ while (System.currentTimeMillis() < endTime) {
+ long id = rand.nextInt(idCount);
+
+ IdLock.Entry lockEntry = idLock.getLockEntry(id);
+ try {
+ String existingTaskName = idOwner.putIfAbsent(id, taskName);
+ if (existingTaskName != null) {
+ return "Id " + id + " already taken by " + existingTaskName + ", "
+ + taskName + " failed";
+ }
+
+ int sleepMs = 1 + rand.nextInt(4);
+ Thread.sleep(sleepMs);
+ idOwner.remove(id, taskName);
+
+ } finally {
+ lockEntry.release();
+ }
+ }
+ return null;
+ }
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testIntensively() throws Exception {
+ final int idCount = 16;
+ final int executionIntervalMillis = 15 * 1000;
+ final int threadCount = 128;
+
+ IdLock idLock = new IdLock();
+
+ IdLockIntensiveTestTaskProvider taskProvider =
+ new IdLockIntensiveTestTaskProvider(idLock, idCount, executionIntervalMillis);
+
+ ExecutorService service = Executors.newFixedThreadPool(threadCount);
+ try {
+ ExecutorCompletionService completionService =
+ new ExecutorCompletionService(service);
+
+ for (int i = 0; i < threadCount; i++) {
+ completionService.submit(taskProvider.nextTask());
+ }
+
+ for (int i = 0; i < threadCount; i++) {
+ Future result = completionService.take();
+ assertNull(result.get());
+ }
+
+ } finally {
+ service.shutdown();
+ }
+
+ assertTrue(idLock.isEmpty());
+ }
+}