From 5d7dc9066fbfcb7515c357db4838c6a28f53b4d8 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 27 Oct 2015 12:16:37 -0700 Subject: [PATCH] HBASE-14708 Use copy on write TreeMap for region location cache Summary: Create and use a copy on write tree map for region location. Test Plan: unit tests, followed by IT tests. Differential Revision: https://reviews.facebook.net/D49545 --- .../org/apache/hadoop/hbase/client/MetaCache.java | 22 +- .../hadoop/hbase/types/CopyOnWriteTreeMap.java | 293 +++++++++++++++++++++ .../hadoop/hbase/types/TestCopyOnWriteTreeMap.java | 170 ++++++++++++ 3 files changed, 474 insertions(+), 11 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteTreeMap.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/types/TestCopyOnWriteTreeMap.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index b23ca70..36d9238 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -21,10 +21,9 @@ package org.apache.hadoop.hbase.client; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.CopyOnWriteArraySet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +34,7 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.types.CopyOnWriteTreeMap; import org.apache.hadoop.hbase.util.Bytes; /** @@ -48,16 +48,16 @@ public class MetaCache { /** * Map of table to table {@link HRegionLocation}s. */ - private final ConcurrentMap> + private final ConcurrentMap> cachedRegionLocations = - new ConcurrentHashMap>(); + new CopyOnWriteTreeMap<>(); // The presence of a server in the map implies it's likely that there is an // entry in cachedRegionLocations that map to this server; but the absence // of a server in this map guarentees that there is no entry in cache that // maps to the absent server. // The access to this attribute must be protected by a lock on cachedRegionLocations - private final Set cachedServers = new ConcurrentSkipListSet(); + private final Set cachedServers = new CopyOnWriteArraySet<>(); private final MetricsConnection metrics; @@ -75,7 +75,7 @@ public class MetaCache { * @return Null or region location found in cache. */ public RegionLocations getCachedLocation(final TableName tableName, final byte [] row) { - ConcurrentSkipListMap tableLocations = + ConcurrentNavigableMap tableLocations = getTableLocations(tableName); Entry e = tableLocations.floorEntry(row); @@ -194,15 +194,15 @@ public class MetaCache { * @param tableName * @return Map of cached locations for passed tableName */ - private ConcurrentSkipListMap + private ConcurrentNavigableMap getTableLocations(final TableName tableName) { // find the map of cached locations for this table - ConcurrentSkipListMap result; + ConcurrentNavigableMap result; result = this.cachedRegionLocations.get(tableName); // if tableLocations for this table isn't built yet, make one if (result == null) { - result = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); - ConcurrentSkipListMap old = + result = new CopyOnWriteTreeMap(Bytes.BYTES_COMPARATOR); + ConcurrentNavigableMap old = this.cachedRegionLocations.putIfAbsent(tableName, result); if (old != null) { return old; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteTreeMap.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteTreeMap.java new file mode 100644 index 0000000..4903351 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteTreeMap.java @@ -0,0 +1,293 @@ +package org.apache.hadoop.hbase.types; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentNavigableMap; + + +/** + * Copy on Write concurrent version of TreeMap + * + * + * + * @param + * @param + */ +public class CopyOnWriteTreeMap implements Map, ConcurrentNavigableMap { + + volatile TreeMap map = null; + + public CopyOnWriteTreeMap() { + this.map = new TreeMap<>(); + } + + public CopyOnWriteTreeMap(Comparator comparator) { + map = new TreeMap<>(comparator); + } + + + private CopyOnWriteTreeMap(Map subMap) { + map = new TreeMap<>(subMap); + } + + /* + * Slow but supported operations + */ + @Override + public ConcurrentNavigableMap subMap(K fromKey, + boolean fromInclusive, + K toKey, + boolean toInclusive) { + return new CopyOnWriteTreeMap<>(map.subMap(fromKey, fromInclusive, toKey, toInclusive)); + } + + @Override + public ConcurrentNavigableMap headMap(K toKey, boolean inclusive) { + return new CopyOnWriteTreeMap<>(map.headMap(toKey, inclusive)); + } + + @Override + public ConcurrentNavigableMap tailMap(K fromKey, boolean inclusive) { + return new CopyOnWriteTreeMap<>(map.tailMap(fromKey, inclusive)); + } + + + @Override + public ConcurrentNavigableMap subMap(K fromKey, K toKey) { + return new CopyOnWriteTreeMap<>(map.subMap(fromKey, toKey)); + } + + @Override + public ConcurrentNavigableMap headMap(K toKey) { + return new CopyOnWriteTreeMap<>(map.headMap(toKey)); + } + + @Override + public ConcurrentNavigableMap tailMap(K fromKey) { + return new CopyOnWriteTreeMap<>(map.tailMap(fromKey)); + } + + + + /* + * Supported Non-modifying operations. + * These should all be fast and un-synchronized. + */ + @Override + public Comparator comparator() { + return map.comparator(); + } + + @Override + public int size() { + return map.size(); + } + + @Override + public boolean isEmpty() { + return map.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return map.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return map.containsValue(value); + } + + @Override + public V get(Object key) { + return map.get(key); + } + + @Override + public K firstKey() { + return map.firstKey(); + } + + @Override + public K lastKey() { + return map.lastKey(); + } + + @Override + public Entry lowerEntry(K key) { + return map.lowerEntry(key); + } + + @Override + public K lowerKey(K key) { + return map.lowerKey(key); + } + + @Override + public Entry floorEntry(K key) { + return map.floorEntry(key); + } + + @Override + public K floorKey(K key) { + return map.floorKey(key); + } + + @Override + public Entry ceilingEntry(K key) { + return map.ceilingEntry(key); + } + + @Override + public K ceilingKey(K key) { + return map.ceilingKey(key); + } + + @Override + public Entry higherEntry(K key) { + return map.higherEntry(key); + } + + @Override + public K higherKey(K key) { + return map.higherKey(key); + } + + @Override + public Entry firstEntry() { + return map.firstEntry(); + } + + @Override + public Entry lastEntry() { + return map.lastEntry(); + } + + @Override + public Entry pollFirstEntry() { + return map.pollFirstEntry(); + } + + @Override + public Entry pollLastEntry() { + return map.pollLastEntry(); + } + + @Override + public ConcurrentNavigableMap descendingMap() { + return new CopyOnWriteTreeMap<>(map.descendingMap()); + } + + @Override + public NavigableSet navigableKeySet() { + return map.navigableKeySet(); + } + + @Override + public NavigableSet keySet() { + return map.navigableKeySet(); + } + + @Override + public NavigableSet descendingKeySet() { + return map.descendingKeySet(); + } + + @Override + public Collection values() { + return map.values(); + } + + @Override + public Set> entrySet() { + return map.entrySet(); + } + + /* + * Supported but modifying operations. + * EVERYTHING that changes map should be synchronized. + * + */ + + @Override + public synchronized V put(K key, V value) { + TreeMap newMap = new TreeMap<>(map); + V returnValue = newMap.put(key, value); + map = newMap; + return returnValue; + } + + @Override + public synchronized V remove(Object key) { + TreeMap newMap = new TreeMap<>(map); + V returnValue = newMap.remove(key); + map = newMap; + return returnValue; + } + + @Override + public synchronized void putAll(Map m) { + TreeMap newMap = new TreeMap<>(map); + newMap.putAll(m); + map = newMap; + } + + @Override + public synchronized void clear() { + map = new TreeMap<>(comparator()); + } + + @Override + public synchronized V putIfAbsent(K key, V value) { + if (!map.containsKey(key)) { + V result; + TreeMap newMap = new TreeMap<>(map); + result = newMap.put(key, value); + map = newMap; + return result; + } else { + return map.get(key); + } + } + + @Override + public synchronized boolean remove(Object key, Object value) { + if (map.containsKey(key) && map.get(key).equals(value)) { + TreeMap newMap = new TreeMap<>(map); + map.remove(key); + map = newMap; + return true; + } else { + return false; + } + } + + @Override + public synchronized boolean replace(K key, V oldValue, V newValue) { + if (map.containsKey(key) && map.get(key).equals(oldValue)) { + TreeMap newMap = new TreeMap<>(map); + newMap.put(key, newValue); + map = newMap; + return true; + } else { + return false; + } + + } + + @Override + public synchronized V replace(K key, V value) { + if (map.containsKey(key)) { + TreeMap newMap = new TreeMap<>(map); + V result = map.put(key, value); + map = newMap; + return result; + } else { + return null; + } + } +} \ No newline at end of file diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/types/TestCopyOnWriteTreeMap.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/types/TestCopyOnWriteTreeMap.java new file mode 100644 index 0000000..94f43d0 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/types/TestCopyOnWriteTreeMap.java @@ -0,0 +1,170 @@ +package org.apache.hadoop.hbase.types; + +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.*; + +@Category({MiscTests.class, MediumTests.class}) +public class TestCopyOnWriteTreeMap { + + private static final int MAX_RAND = 10 * 1000 * 1000; + private CopyOnWriteTreeMap m; + private ConcurrentSkipListMap csm; + + @Before + public void setUp() throws Exception { + m = new CopyOnWriteTreeMap<>(); + csm = new ConcurrentSkipListMap<>(); + for ( long i = 0 ; i < 10000; i++ ) { + long o = ThreadLocalRandom.current().nextLong(MAX_RAND); + m.put(i, o); + csm.put(i,o); + } + } + + @Test + public void testSize() throws Exception { + assertEquals("Size should always be equal", m.size(), csm.size()); + } + + @Test + public void testIsEmpty() throws Exception { + CopyOnWriteTreeMap myMap = new CopyOnWriteTreeMap<>(); + assertTrue(myMap.isEmpty()); + myMap.put(100L, 100L); + assertFalse(myMap.isEmpty()); + myMap.remove(100L); + assertTrue(myMap.isEmpty()); + } + + @Test + public void testLowerKey() throws Exception { + for ( int i =0 ; i < 100; i ++) { + Long key = ThreadLocalRandom.current().nextLong(); + assertEquals(m.lowerKey(key), csm.lowerKey(key)); + } + } + + @Test + public void testFloorEntry() throws Exception { + for ( int i =0 ; i < 100; i ++) { + Long key = ThreadLocalRandom.current().nextLong(); + assertEquals(m.floorEntry(key), csm.floorEntry(key)); + } + } + + @Test + public void testFloorKey() throws Exception { + for ( int i =0 ; i < 100; i ++) { + Long key = ThreadLocalRandom.current().nextLong(); + assertEquals(m.floorKey(key), csm.floorKey(key)); + } + } + + @Test + public void testRemove() throws Exception { + for (Map.Entry e:csm.entrySet()) { + assertEquals(csm.remove(e.getKey()), m.remove(e.getKey())); + assertEquals(null, m.remove(e.getKey())); + } + } + + @Test + public void testReplace() throws Exception { + for (Map.Entry e:csm.entrySet()) { + Long newValue = ThreadLocalRandom.current().nextLong(); + assertEquals(csm.replace(e.getKey(), newValue), m.replace(e.getKey(), newValue)); + } + assertEquals(null, m.replace(MAX_RAND + 100L, ThreadLocalRandom.current().nextLong())); + } + + @Test + public void testReplace1() throws Exception { + for (Map.Entry e: csm.entrySet()) { + Long newValue = ThreadLocalRandom.current().nextLong(); + assertEquals(csm.replace(e.getKey(), e.getValue() + 1, newValue), + m.replace(e.getKey(), e.getValue() + 1, newValue)); + assertEquals(csm.replace(e.getKey(), e.getValue(), newValue), + m.replace(e.getKey(), e.getValue(), newValue)); + assertEquals(newValue, m.get(e.getKey())); + assertEquals(csm.get(e.getKey()), m.get(e.getKey())); + } + assertEquals(null, m.replace(MAX_RAND + 100L, ThreadLocalRandom.current().nextLong())); + } + + @Test + public void testMultiAdd() throws InterruptedException { + final CopyOnWriteTreeMap myMap = new CopyOnWriteTreeMap<>(); + + Thread[] threads = new Thread[10]; + for ( int i =0 ; i