From 3881ec9e66ac665ac5e9199b433754957a1d2033 Mon Sep 17 00:00:00 2001 From: chenheng Date: Thu, 27 Aug 2015 16:54:45 +0800 Subject: [PATCH] HBASE-14279 Race condition in ConcurrentIndex --- .../apache/hadoop/hbase/util/ConcurrentIndex.java | 108 +++++++++++++++++---- 1 file changed, 88 insertions(+), 20 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java index 3b4a1f1..c726885 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java @@ -52,6 +52,8 @@ public class ConcurrentIndex { /** Container for the sets, indexed by key */ private final ConcurrentMap> container; + private final ConcurrentMap lockMap; + /** * A factory that constructs new instances of the sets if no set is * associated with a given key. @@ -66,6 +68,7 @@ public class ConcurrentIndex { public ConcurrentIndex(Supplier> valueSetFactory) { this.valueSetFactory = valueSetFactory; this.container = new ConcurrentHashMap>(); + this.lockMap = new ConcurrentHashMap(); } /** @@ -87,19 +90,27 @@ public class ConcurrentIndex { * @param value An additional unique value we want to associate with a key */ public void put(K key, V value) { - Set set = container.get(key); - if (set != null) { - set.add(value); - } else { - set = valueSetFactory.get(); - set.add(value); - Set existing = container.putIfAbsent(key, set); - if (existing != null) { - // If a set is already associated with a key, that means another - // writer has already come in and created the set for the given key. - // Pursuant to an optimistic concurrency policy, in this case we will - // simply add the value to the existing set associated with the key. - existing.add(value); + Entry lock = null; + try { + lock = lock(key); + Set set = container.get(key); + if (set != null) { + set.add(value); + } else { + set = valueSetFactory.get(); + set.add(value); + Set existing = container.putIfAbsent(key, set); + if (existing != null) { + // If a set is already associated with a key, that means another + // writer has already come in and created the set for the given key. + // Pursuant to an optimistic concurrency policy, in this case we will + // simply add the value to the existing set associated with the key. + existing.add(value); + } + } + } finally { + if (lock != null) { + releaseLock(lock); } } } @@ -126,15 +137,72 @@ public class ConcurrentIndex { * @param value The value to disassociate with the key */ public boolean remove(K key, V value) { - Set set = container.get(key); - boolean success = false; - if (set != null) { - success = set.remove(value); - if (set.isEmpty()) { - container.remove(key); + Entry lock = null; + try { + lock = lock(key); + Set set = container.get(key); + boolean success = false; + if (set != null) { + success = set.remove(value); + if (set.isEmpty()) { + container.remove(key); + } + } + return success; + } finally { + if (lock != null) { + releaseLock(lock); } } - return success; + } + + private Entry lock(K key) { + Entry entry = new Entry(key); + Entry existing; + while ((existing = lockMap.putIfAbsent(entry.key, entry)) != null) { + synchronized (existing) { + if (existing.isLocked) { + ++existing.numWaiters; // Add ourselves to waiters. + while (existing.isLocked) { + try { + existing.wait(); + } catch (InterruptedException e) { + --existing.numWaiters; // Remove ourselves from waiters. + } + } + + --existing.numWaiters; // Remove ourselves from waiters. + existing.isLocked = true; + return existing; + } + // If the entry is not locked, it might already be deleted from the + // map, so we cannot return it. We need to get our entry into the map + // or get someone else's locked entry. + } + } + return entry; + } + + + private void releaseLock(Entry entry) { + synchronized (entry) { + entry.isLocked = false; + if (entry.numWaiters > 0) { + entry.notify(); + } else { + lockMap.remove(entry.key); + } + } + } + + private class Entry { + private final K key; + private int numWaiters; + private boolean isLocked = true; + + private Entry(K key) { + this.key = key; + } } /** -- 1.9.3 (Apple Git-50)