From 6a7212f23f8fca02ea9cade15d759d2426401ce7 Mon Sep 17 00:00:00 2001 From: chenheng Date: Fri, 6 Nov 2015 11:42:19 +0800 Subject: [PATCH] HBASE-14279 Race condition in ConcurrentIndex --- .../apache/hadoop/hbase/util/ConcurrentIndex.java | 93 ++++++++++++++++------ .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 4 +- .../hbase/io/hfile/bucket/TestBucketCache.java | 61 ++++++++++++-- 3 files changed, 122 insertions(+), 36 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java index 3b4a1f1..af7db77 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java @@ -22,10 +22,12 @@ package org.apache.hadoop.hbase.util; import java.util.Comparator; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ConcurrentSkipListSet; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -49,8 +51,11 @@ import com.google.common.base.Supplier; @InterfaceStability.Evolving public class ConcurrentIndex { - /** Container for the sets, indexed by key */ - private final ConcurrentMap> container; + /** The length is a power of 2. */ + @SuppressWarnings("rawtypes") + private final Map[] segments; + private static final int MAX_SEGMENTS = 1 << 16; + private static final int DEFAULT_CONCURRENCY_LEVEL = 16; /** * A factory that constructs new instances of the sets if no set is @@ -64,8 +69,23 @@ public class ConcurrentIndex { * @param valueSetFactory The factory instance */ public ConcurrentIndex(Supplier> valueSetFactory) { + this(valueSetFactory, DEFAULT_CONCURRENCY_LEVEL); + } + + public ConcurrentIndex(Supplier> valueSetFactory, + int concurrencyLevel) { this.valueSetFactory = valueSetFactory; - this.container = new ConcurrentHashMap>(); + int size = Integer.highestOneBit(concurrencyLevel - 1) << 1; + size = Math.min(MAX_SEGMENTS, Math.max(1, size)); + segments = new Map[size]; + for (int i=0; i < segments.length; i++) { + segments[i] = new HashMap>(); + } + } + + @SuppressWarnings("unchecked") + private Map> segments(K key) { + return segments[hash(key) & segments.length - 1]; } /** @@ -87,20 +107,14 @@ public class ConcurrentIndex { * @param value An additional unique value we want to associate with a key */ public void put(K key, V value) { - Set set = container.get(key); - if (set != null) { - set.add(value); - } else { - set = valueSetFactory.get(); - set.add(value); - Set existing = container.putIfAbsent(key, set); - if (existing != null) { - // If a set is already associated with a key, that means another - // writer has already come in and created the set for the given key. - // Pursuant to an optimistic concurrency policy, in this case we will - // simply add the value to the existing set associated with the key. - existing.add(value); + Map> seg = segments(key); + synchronized(seg) { + Set set = seg.get(key); + if (set == null) { + set = valueSetFactory.get(); + seg.put(key, set); } + set.add(value); } } @@ -115,7 +129,14 @@ public class ConcurrentIndex { * are associated with the key. */ public Set values(K key) { - return container.get(key); + Map> seg = segments(key); + synchronized (seg) { + Set set = seg.get(key); + if (set == null) { + return null; + } + return ImmutableSet.copyOf(set); + } } /** @@ -126,17 +147,37 @@ public class ConcurrentIndex { * @param value The value to disassociate with the key */ public boolean remove(K key, V value) { - Set set = container.get(key); - boolean success = false; - if (set != null) { - success = set.remove(value); - if (set.isEmpty()) { - container.remove(key); + Map> seg = segments(key); + synchronized(seg) { + Set set = seg.get(key); + if (set != null) { + if(set.remove(value)) { + if (set.isEmpty()) { + seg.remove(key); + } + return true; + } } + return false; } - return success; } + + private static int hash(Object k) { + // Copied from ConcurrentHashMap + int h = k.hashCode(); + + // Spread bits to regularize both segment and index locations, + // using variant of single-word Wang/Jenkins hash. + h += (h << 15) ^ 0xffffcd7d; + h ^= (h >>> 10); + h += (h << 3); + h ^= (h >>> 6); + h += (h << 2) + (h << 14); + return h ^ (h >>> 16); + } + + /** * Default factory class for the sets associated with given keys. Creates * a {@link ConcurrentSkipListSet} using the comparator passed into the diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 5eb6f8f..5548fae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -1129,13 +1129,11 @@ public class BucketCache implements BlockCache, HeapSize { return 0; } int numEvicted = 0; - List keysForHFile = ImmutableList.copyOf(keySet); - for (BlockCacheKey key : keysForHFile) { + for (BlockCacheKey key : keySet) { if (evictBlock(key)) { ++numEvicted; } } - return numEvicted; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 70d21dd..f61757b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -18,15 +18,9 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Random; +import java.util.*; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; @@ -35,6 +29,7 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.ConcurrentIndex; import org.apache.hadoop.hbase.util.IdLock; import org.junit.After; import org.junit.Before; @@ -43,6 +38,8 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.junit.Assert.*; + /** * Basic test of BucketCache.Puts and gets. *

@@ -219,4 +216,54 @@ public class TestBucketCache { assertTrue(cache.getCurrentSize() > 0L); assertTrue("We should have a block!", cache.iterator().hasNext()); } + + @Test + public void testConcurrentIndex() { + + ConcurrentIndex concurrentIndex = new ConcurrentIndex( + new Comparator() { + @Override + public int compare(BlockCacheKey a, BlockCacheKey b) { + if (a.getOffset() == b.getOffset()) { + return 0; + } else if (a.getOffset() < b.getOffset()) { + return -1; + } + return 1; + } + }); + + String hfileName = "testfile"; + int len = 10; + BlockCacheKey[] blockCacheKeys = new BlockCacheKey[len]; + for (int i = 0; i < len; i++) { + blockCacheKeys[i] = new BlockCacheKey(hfileName, i); + concurrentIndex.put(blockCacheKeys[i].getHfileName(), blockCacheKeys[i]); + } + + Set values = concurrentIndex.values(hfileName); + + assertEquals(len, values.size()); + int index = 0; + for (BlockCacheKey blockCacheKey : values) { + assertEquals(blockCacheKey, blockCacheKeys[index++]); + } + + hfileName = "testfile1"; + BlockCacheKey toRemove1 = new BlockCacheKey(hfileName, 1); + concurrentIndex.put(hfileName, toRemove1); + BlockCacheKey toRemove2 = new BlockCacheKey(hfileName, 1); + concurrentIndex.put(hfileName, toRemove2); + assertTrue(concurrentIndex.remove(hfileName, toRemove1)); + assertNull(concurrentIndex.values(hfileName)); + assertFalse(concurrentIndex.remove(hfileName, toRemove1)); + + BlockCacheKey toRemove3 = new BlockCacheKey(hfileName, 3); + concurrentIndex.put(hfileName, toRemove3); + BlockCacheKey toRemove4 = new BlockCacheKey(hfileName, 4); + concurrentIndex.put(hfileName, toRemove4); + assertTrue(concurrentIndex.remove(hfileName, toRemove3)); + assertEquals(1, concurrentIndex.values(hfileName).size()); + + } } -- 1.9.3 (Apple Git-50)