Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java (revision 1536591) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java (working copy) @@ -30,9 +30,11 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.PriorityQueue; +import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -58,11 +60,13 @@ import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.util.ConcurrentIndex; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.util.StringUtils; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -169,8 +173,19 @@ */ private IdLock offsetLock = new IdLock(); + private final ConcurrentIndex blocksByHFile = + new ConcurrentIndex(new Comparator() { + @Override + public int compare(BlockCacheKey a, BlockCacheKey b) { + if (a.getOffset() == b.getOffset()) { + return 0; + } else if (a.getOffset() < b.getOffset()) { + return -1; + } + return 1; + } + }); - /** Statistics thread schedule pool (for heavy debugging, could remove) */ private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, @@ -322,6 +337,7 @@ } else { this.blockNumber.incrementAndGet(); this.heapSize.addAndGet(cachedItem.heapSize()); + blocksByHFile.put(cacheKey.getHfileName(), cacheKey); } } @@ -392,6 +408,7 @@ if (bucketEntry.equals(backingMap.remove(cacheKey))) { bucketAllocator.freeBlock(bucketEntry.offset()); realCacheSize.addAndGet(-1 * bucketEntry.getLength()); + blocksByHFile.remove(cacheKey.getHfileName(), cacheKey); if (removedBlock == null) { this.blockNumber.decrementAndGet(); } @@ -914,10 +931,7 @@ } /** - * Evicts all blocks for a specific HFile. This is an expensive operation - * implemented as a linear-time search through all blocks in the cache. - * Ideally this should be a search in a log-access-time map. - * + * Evicts all blocks for a specific HFile. *

* This is used for evict-on-close to remove all blocks of a specific HFile. * @@ -925,13 +939,20 @@ */ @Override public int evictBlocksByHfileName(String hfileName) { + // Copy the list to avoid ConcurrentModificationException + // as evictBlockKey removes the key from the index + Set keySet = blocksByHFile.values(hfileName); + if (keySet == null) { + return 0; + } int numEvicted = 0; - for (BlockCacheKey key : this.backingMap.keySet()) { - if (key.getHfileName().equals(hfileName)) { - if (evictBlock(key)) + List keysForHFile = ImmutableList.copyOf(keySet); + for (BlockCacheKey key : keysForHFile) { + if (evictBlock(key)) { ++numEvicted; } } + return numEvicted; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (revision 1536591) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (working copy) @@ -97,4 +97,8 @@ public DataBlockEncoding getDataBlockEncoding() { return encoding; } + + public long getOffset() { + return offset; + } } Index: hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java (revision 0) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentIndex.java (working copy) @@ -0,0 +1,173 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + + +import com.google.common.base.Supplier; +import com.google.common.collect.Multiset; + +import java.util.Comparator; +import java.util.ConcurrentModificationException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A simple concurrent map of sets. This is similar in concept to + * {@link Multiset}, with the following exceptions: + *

    + *
  • The set is thread-safe and concurrent: no external locking or + * synchronization is required. This is important for the use case where + * this class is used to index cached blocks by filename for their + * efficient eviction from cache when the file is closed or compacted.
  • + *
  • The expectation is that all entries may only be removed for a key + * once no more additions of values are being made under that key.
  • + *
+ * @param Key type + * @param Value type + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ConcurrentIndex { + + /** Container for the sets, indexed by key */ + private final ConcurrentMap> container; + + /** + * A factory that constructs new instances of the sets if no set is + * associated with a given key. + */ + private final Supplier> valueSetFactory; + + /** + * Creates an instance with a specified factory object for sets to be + * associated with a given key. + * @param valueSetFactory The factory instance + */ + public ConcurrentIndex(Supplier> valueSetFactory) { + this.valueSetFactory = valueSetFactory; + this.container = new ConcurrentHashMap>(); + } + + /** + * Creates an instance using the DefaultValueSetFactory for sets, + * which in turn creates instances of {@link ConcurrentSkipListSet} + * @param valueComparator A {@link Comparator} for value types + */ + public ConcurrentIndex(Comparator valueComparator) { + this(new DefaultValueSetFactory(valueComparator)); + } + + /** + * Associate a new unique value with a specified key. Under the covers, the + * method employs optimistic concurrency: if no set is associated with a + * given key, we create a new set; if another thread comes in, creates, + * and associates a set with the same key in the mean-time, we simply add + * the value to the already created set. + * @param key The key + * @param value An additional unique value we want to associate with a key + */ + public void put(K key, V value) { + Set set = container.get(key); + if (set != null) { + set.add(value); + } else { + set = valueSetFactory.get(); + set.add(value); + Set existing = container.putIfAbsent(key, set); + if (existing != null) { + // If a set is already associated with a key, that means another + // writer has already come in and created the set for the given key. + // Pursuant to an optimistic concurrency policy, in this case we will + // simply add the value to the existing set associated with the key. + existing.add(value); + } + } + } + + /** + * Get all values associated with a specified key or null if no values are + * associated. Note: if the caller wishes to add or removes values + * to under the specified as they're iterating through the returned value, + * they should make a defensive copy; otherwise, a + * {@link ConcurrentModificationException} may be thrown. + * @param key The key + * @return All values associated with the specified key or null if no values + * are associated with the key. + */ + public Set values(K key) { + return container.get(key); + } + + /** + * Removes the association between a specified key and value. If as a + * result of removing a value a set becomes empty, we remove the given + * set from the mapping as well. + * @param key The specified key + * @param value The value to disassociate with the key + */ + public boolean remove(K key, V value) { + Set set = container.get(key); + boolean success = false; + if (set != null) { + success = set.remove(value); + if (set.isEmpty()) { + container.remove(key); + } + } + return success; + } + + /** + * Default factory class for the sets associated with given keys. Creates + * a {@link ConcurrentSkipListSet} using the comparator passed into the + * constructor. + * @see ConcurrentSkipListSet + * @see Supplier + * @param The value type. Should match value type of the + * ConcurrentIndex instances of this object are passed to. + */ + private static class DefaultValueSetFactory implements Supplier> { + private final Comparator comparator; + + /** + * Creates an instance that passes a specified comparator to the + * {@link ConcurrentSkipListSet} + * @param comparator The specified comparator + */ + public DefaultValueSetFactory(Comparator comparator) { + this.comparator = comparator; + } + + /** + * Creates a new {@link ConcurrentSkipListSet} instance using the + * comparator specified when the class instance was constructed. + * @return The instantiated {@link ConcurrentSkipListSet} object + */ + @Override + public Set get() { + return new ConcurrentSkipListSet(comparator); + } + } +} \ No newline at end of file