From 361407b2f4c6b007c74fd770f2cf19a87f89f103 Mon Sep 17 00:00:00 2001 From: chenheng Date: Fri, 6 Nov 2015 00:16:07 +0800 Subject: [PATCH] HBASE-14279 Race condition in ConcurrentIndex --- .../apache/hadoop/hbase/util/ConcurrentIndex.java | 80 +++++++++++++++------- 1 file changed, 54 insertions(+), 26 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java index 3b4a1f1..c04258c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java @@ -22,8 +22,8 @@ package org.apache.hadoop.hbase.util; import java.util.Comparator; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -49,8 +49,11 @@ import com.google.common.base.Supplier; @InterfaceStability.Evolving public class ConcurrentIndex { - /** Container for the sets, indexed by key */ - private final ConcurrentMap> container; + /** The length is a power of 2. */ + @SuppressWarnings("rawtypes") + private final Map[] segments; + private static final int MAX_SEGMENTS = 1 << 16; + private static final int DEFAULT_CONCURRENCY_LEVEL = 16; /** * A factory that constructs new instances of the sets if no set is @@ -64,8 +67,23 @@ public class ConcurrentIndex { * @param valueSetFactory The factory instance */ public ConcurrentIndex(Supplier> valueSetFactory) { + this(valueSetFactory, DEFAULT_CONCURRENCY_LEVEL); + } + + public ConcurrentIndex(Supplier> valueSetFactory, + int concurrencyLevel) { this.valueSetFactory = valueSetFactory; - this.container = new ConcurrentHashMap>(); + int size = Integer.highestOneBit(concurrencyLevel - 1) << 1; + size = Math.min(MAX_SEGMENTS, Math.max(1, size)); + segments = new Map[size]; + for (int i=0; i < segments.length; i++) { + segments[i] = new HashMap>(); + } + } + + @SuppressWarnings("unchecked") + private Map> segments(K key) { + return segments[hash(key) & segments.length - 1]; } /** @@ -87,20 +105,14 @@ public class ConcurrentIndex { * @param value An additional unique value we want to associate with a key */ public void put(K key, V value) { - Set set = container.get(key); - if (set != null) { - set.add(value); - } else { - set = valueSetFactory.get(); - set.add(value); - Set existing = container.putIfAbsent(key, set); - if (existing != null) { - // If a set is already associated with a key, that means another - // writer has already come in and created the set for the given key. - // Pursuant to an optimistic concurrency policy, in this case we will - // simply add the value to the existing set associated with the key. - existing.add(value); + Map> seg = segments(key); + synchronized(seg) { + Set set = seg.get(key); + if (set == null) { + set = valueSetFactory.get(); + seg.put(key, set); } + set.add(value); } } @@ -115,7 +127,8 @@ public class ConcurrentIndex { * are associated with the key. */ public Set values(K key) { - return container.get(key); + Map> seg = segments(key); + return seg.get(key); } /** @@ -126,17 +139,32 @@ public class ConcurrentIndex { * @param value The value to disassociate with the key */ public boolean remove(K key, V value) { - Set set = container.get(key); - boolean success = false; - if (set != null) { - success = set.remove(value); - if (set.isEmpty()) { - container.remove(key); + Map> seg = segments(key); + synchronized(seg) { + Set set = seg.get(key); + if (set != null) { + return set.remove(key); } + return false; } - return success; } + + private static int hash(Object k) { + // Copied from ConcurrentHashMap + int h = k.hashCode(); + + // Spread bits to regularize both segment and index locations, + // using variant of single-word Wang/Jenkins hash. + h += (h << 15) ^ 0xffffcd7d; + h ^= (h >>> 10); + h += (h << 3); + h ^= (h >>> 6); + h += (h << 2) + (h << 14); + return h ^ (h >>> 16); + } + + /** * Default factory class for the sets associated with given keys. Creates * a {@link ConcurrentSkipListSet} using the comparator passed into the -- 2.3.8 (Apple Git-58)