From 990fed02cc1df6053f157d56f9decd67a13c9fa3 Mon Sep 17 00:00:00 2001 From: chenheng Date: Wed, 23 Dec 2015 10:46:14 +0800 Subject: [PATCH] HBASE-14279 Race condition in ConcurrentIndex --- .../apache/hadoop/hbase/util/ConcurrentIndex.java | 171 --------------------- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 138 ++++++++++++++++- .../hbase/io/hfile/bucket/TestBucketCache.java | 55 +++++++ 3 files changed, 189 insertions(+), 175 deletions(-) delete mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java deleted file mode 100644 index 3b4a1f1..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - - -import java.util.Comparator; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -import com.google.common.base.Supplier; - -/** - * A simple concurrent map of sets. This is similar in concept to - * {@link com.google.common.collect.Multiset}, with the following exceptions: - * - * @param Key type - * @param Value type - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class ConcurrentIndex { - - /** Container for the sets, indexed by key */ - private final ConcurrentMap> container; - - /** - * A factory that constructs new instances of the sets if no set is - * associated with a given key. - */ - private final Supplier> valueSetFactory; - - /** - * Creates an instance with a specified factory object for sets to be - * associated with a given key. - * @param valueSetFactory The factory instance - */ - public ConcurrentIndex(Supplier> valueSetFactory) { - this.valueSetFactory = valueSetFactory; - this.container = new ConcurrentHashMap>(); - } - - /** - * Creates an instance using the DefaultValueSetFactory for sets, - * which in turn creates instances of {@link ConcurrentSkipListSet} - * @param valueComparator A {@link Comparator} for value types - */ - public ConcurrentIndex(Comparator valueComparator) { - this(new DefaultValueSetFactory(valueComparator)); - } - - /** - * Associate a new unique value with a specified key. Under the covers, the - * method employs optimistic concurrency: if no set is associated with a - * given key, we create a new set; if another thread comes in, creates, - * and associates a set with the same key in the mean-time, we simply add - * the value to the already created set. - * @param key The key - * @param value An additional unique value we want to associate with a key - */ - public void put(K key, V value) { - Set set = container.get(key); - if (set != null) { - set.add(value); - } else { - set = valueSetFactory.get(); - set.add(value); - Set existing = container.putIfAbsent(key, set); - if (existing != null) { - // If a set is already associated with a key, that means another - // writer has already come in and created the set for the given key. - // Pursuant to an optimistic concurrency policy, in this case we will - // simply add the value to the existing set associated with the key. - existing.add(value); - } - } - } - - /** - * Get all values associated with a specified key or null if no values are - * associated. Note: if the caller wishes to add or removes values - * to under the specified as they're iterating through the returned value, - * they should make a defensive copy; otherwise, a - * {@link java.util.ConcurrentModificationException} may be thrown. - * @param key The key - * @return All values associated with the specified key or null if no values - * are associated with the key. - */ - public Set values(K key) { - return container.get(key); - } - - /** - * Removes the association between a specified key and value. If as a - * result of removing a value a set becomes empty, we remove the given - * set from the mapping as well. - * @param key The specified key - * @param value The value to disassociate with the key - */ - public boolean remove(K key, V value) { - Set set = container.get(key); - boolean success = false; - if (set != null) { - success = set.remove(value); - if (set.isEmpty()) { - container.remove(key); - } - } - return success; - } - - /** - * Default factory class for the sets associated with given keys. Creates - * a {@link ConcurrentSkipListSet} using the comparator passed into the - * constructor. - * @see ConcurrentSkipListSet - * @see Supplier - * @param The value type. Should match value type of the - * ConcurrentIndex instances of this object are passed to. - */ - private static class DefaultValueSetFactory implements Supplier> { - private final Comparator comparator; - - /** - * Creates an instance that passes a specified comparator to the - * {@link ConcurrentSkipListSet} - * @param comparator The specified comparator - */ - public DefaultValueSetFactory(Comparator comparator) { - this.comparator = comparator; - } - - /** - * Creates a new {@link ConcurrentSkipListSet} instance using the - * comparator specified when the class instance was constructed. - * @return The instantiated {@link ConcurrentSkipListSet} object - */ - @Override - public Set get() { - return new ConcurrentSkipListSet(comparator); - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 6024958..9714e58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -31,11 +31,13 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -49,6 +51,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.collect.ImmutableSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -66,14 +69,12 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.nio.ByteBuff; -import org.apache.hadoop.hbase.util.ConcurrentIndex; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdReadWriteLock; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -1106,8 +1107,7 @@ public class BucketCache implements BlockCache, HeapSize { return 0; } int numEvicted = 0; - List keysForHFile = ImmutableList.copyOf(keySet); - for (BlockCacheKey key : keysForHFile) { + for (BlockCacheKey key : keySet) { if (evictBlock(key)) { ++numEvicted; } @@ -1343,6 +1343,136 @@ public class BucketCache implements BlockCache, HeapSize { } /** + * A simple concurrent map of sets. This is similar in concept to + * {@link com.google.common.collect.Multiset}, with the following exceptions: + *
    + *
  • The set is thread-safe and concurrent: no external locking or + * synchronization is required. This is important for the use case where + * this class is used to index cached blocks by filename for their + * efficient eviction from cache when the file is closed or compacted.
  • + *
  • The expectation is that all entries may only be removed for a key + * once no more additions of values are being made under that key.
  • + *
+ * @param Key type + * @param Value type + */ + @VisibleForTesting + static class ConcurrentIndex { + + /** The length is a power of 2. */ + @SuppressWarnings("rawtypes") + private final Map[] segments; + private static final int MAX_SEGMENTS = 1 << 16; + private static final int DEFAULT_CONCURRENCY_LEVEL = 16; + private final Comparator comparator; + + public ConcurrentIndex(Comparator comparator, + int concurrencyLevel) { + this.comparator = comparator; + int size = Integer.highestOneBit(concurrencyLevel - 1) << 1; + size = Math.min(MAX_SEGMENTS, Math.max(1, size)); + segments = new Map[size]; + for (int i=0; i < segments.length; i++) { + segments[i] = new HashMap>(); + } + } + + @SuppressWarnings("unchecked") + private Map> segments(K key) { + return segments[hash(key) & segments.length - 1]; + } + + /** + * Creates an instance using the Comparator for set. + * @param valueComparator A {@link Comparator} for value types + */ + public ConcurrentIndex(Comparator valueComparator) { + this(valueComparator, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Associate a new unique value with a specified key. Under the covers, the + * method employs optimistic concurrency: if no set is associated with a + * given key, we create a new set; if another thread comes in, creates, + * and associates a set with the same key in the mean-time, we simply add + * the value to the already created set. + * @param key The key + * @param value An additional unique value we want to associate with a key + */ + public void put(K key, V value) { + Map> seg = segments(key); + synchronized(seg) { + Set set = seg.get(key); + if (set == null) { + set = new TreeSet(comparator); + seg.put(key, set); + } + set.add(value); + } + } + + /** + * Get all values associated with a specified key or null if no values are + * associated. Note: if the caller wishes to add or removes values + * to under the specified as they're iterating through the returned value, + * they should make a defensive copy; otherwise, a + * {@link java.util.ConcurrentModificationException} may be thrown. + * @param key The key + * @return All values associated with the specified key or null if no values + * are associated with the key. + */ + public Set values(K key) { + Map> seg = segments(key); + synchronized (seg) { + Set set = seg.get(key); + if (set == null) { + return null; + } + return ImmutableSet.copyOf(set); + } + } + + /** + * Removes the association between a specified key and value. If as a + * result of removing a value a set becomes empty, we remove the given + * set from the mapping as well. + * @param key The specified key + * @param value The value to disassociate with the key + */ + public boolean remove(K key, V value) { + Map> seg = segments(key); + synchronized(seg) { + Set set = seg.get(key); + if (set != null) { + if(set.remove(value)) { + if (set.isEmpty()) { + seg.remove(key); + } + return true; + } + } + return false; + } + } + + /** + * Gets the hash code for the specified key. + * This implementation uses the additional hashing routine + * from JDK 1.4. + * + * @param key the key to get a hash value for + * @return the hash value + */ + private int hash(Object key) { + int h = key.hashCode(); + h += ~(h << 9); + h ^= (h >>> 14); + h += (h << 4); + h ^= (h >>> 10); + return h; + } + } + /** * Only used in test * @throws InterruptedException */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 54dd8e5..e322ace 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -18,7 +18,9 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import static junit.framework.TestCase.assertFalse; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.FileNotFoundException; @@ -28,6 +30,8 @@ import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.Comparator; +import java.util.Set; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; @@ -220,4 +224,55 @@ public class TestBucketCache { assertTrue(cache.getCurrentSize() > 0L); assertTrue("We should have a block!", cache.iterator().hasNext()); } + + @Test + public void testConcurrentIndex() { + + BucketCache.ConcurrentIndex concurrentIndex = + new BucketCache.ConcurrentIndex( + new Comparator() { + @Override + public int compare(BlockCacheKey a, BlockCacheKey b) { + if (a.getOffset() == b.getOffset()) { + return 0; + } else if (a.getOffset() < b.getOffset()) { + return -1; + } + return 1; + } + }); + + String hfileName = "testfile"; + int len = 10; + BlockCacheKey[] blockCacheKeys = new BlockCacheKey[len]; + for (int i = 0; i < len; i++) { + blockCacheKeys[i] = new BlockCacheKey(hfileName, i); + concurrentIndex.put(blockCacheKeys[i].getHfileName(), blockCacheKeys[i]); + } + + Set values = concurrentIndex.values(hfileName); + + assertEquals(len, values.size()); + int index = 0; + for (BlockCacheKey blockCacheKey : values) { + assertEquals(blockCacheKey, blockCacheKeys[index++]); + } + + hfileName = "testfile1"; + BlockCacheKey toRemove1 = new BlockCacheKey(hfileName, 1); + concurrentIndex.put(hfileName, toRemove1); + BlockCacheKey toRemove2 = new BlockCacheKey(hfileName, 1); + concurrentIndex.put(hfileName, toRemove2); + assertTrue(concurrentIndex.remove(hfileName, toRemove1)); + assertNull(concurrentIndex.values(hfileName)); + assertFalse(concurrentIndex.remove(hfileName, toRemove1)); + + BlockCacheKey toRemove3 = new BlockCacheKey(hfileName, 3); + concurrentIndex.put(hfileName, toRemove3); + BlockCacheKey toRemove4 = new BlockCacheKey(hfileName, 4); + concurrentIndex.put(hfileName, toRemove4); + assertTrue(concurrentIndex.remove(hfileName, toRemove3)); + assertEquals(1, concurrentIndex.values(hfileName).size()); + + } } -- 1.9.3 (Apple Git-50)