From 68819c0509959724b39a1f004b3bc100d6207d4a Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 27 Oct 2015 12:16:37 -0700 Subject: [PATCH] HBASE-14708 Use copy on write Map for region location cache Summary: Create and use a copy on write map for region location. - Create a copy on write tree map - Create a copy on write map backed by a sorted array. - Create a test for both comparing each with a jdk provided map. - Change MetaCache to use the new map. Test Plan: - org.apache.hadoop.hbase.client.TestFromClientSide - TestHCM Differential Revision: https://reviews.facebook.net/D49545 --- .../org/apache/hadoop/hbase/MetaTableAccessor.java | 8 +- .../org/apache/hadoop/hbase/client/MetaCache.java | 23 +- .../hadoop/hbase/types/CopyOnWriteArrayMap.java | 943 +++++++++++++++++++++ .../hadoop/hbase/types/CopyOnWriteTreeMap.java | 389 +++++++++ .../org/apache/hadoop/hbase/types/TailMapable.java | 32 + .../hadoop/hbase/types/TestCopyOnWriteMaps.java | 320 +++++++ 6 files changed, 1703 insertions(+), 12 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteArrayMap.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteTreeMap.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/types/TailMapable.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/types/TestCopyOnWriteMaps.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index a1303e6..22ff56d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; +import org.apache.hadoop.hbase.types.TailMapable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; @@ -972,7 +973,12 @@ public class MetaTableAccessor { // iterate until all serverName columns are seen int replicaId = 0; byte[] serverColumn = getServerColumn(replicaId); - SortedMap serverMap = infoMap.tailMap(serverColumn, false); + SortedMap serverMap = null; + if (infoMap instanceof TailMapable) { + serverMap = ((TailMapable) infoMap).sortedTailMap(serverColumn, false); + } else { + serverMap = infoMap.tailMap(serverColumn, false); + } if (serverMap.isEmpty()) return new RegionLocations(locations); for (Map.Entry entry : serverMap.entrySet()) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index b23ca70..62d5975 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -21,10 +21,9 @@ package org.apache.hadoop.hbase.client; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.CopyOnWriteArraySet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +34,8 @@ import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap; +import org.apache.hadoop.hbase.types.CopyOnWriteTreeMap; import org.apache.hadoop.hbase.util.Bytes; /** @@ -48,16 +49,16 @@ public class MetaCache { /** * Map of table to table {@link HRegionLocation}s. */ - private final ConcurrentMap> + private final ConcurrentMap> cachedRegionLocations = - new ConcurrentHashMap>(); + new CopyOnWriteArrayMap<>(); // The presence of a server in the map implies it's likely that there is an // entry in cachedRegionLocations that map to this server; but the absence // of a server in this map guarentees that there is no entry in cache that // maps to the absent server. // The access to this attribute must be protected by a lock on cachedRegionLocations - private final Set cachedServers = new ConcurrentSkipListSet(); + private final Set cachedServers = new CopyOnWriteArraySet<>(); private final MetricsConnection metrics; @@ -75,7 +76,7 @@ public class MetaCache { * @return Null or region location found in cache. */ public RegionLocations getCachedLocation(final TableName tableName, final byte [] row) { - ConcurrentSkipListMap tableLocations = + ConcurrentNavigableMap tableLocations = getTableLocations(tableName); Entry e = tableLocations.floorEntry(row); @@ -194,15 +195,15 @@ public class MetaCache { * @param tableName * @return Map of cached locations for passed tableName */ - private ConcurrentSkipListMap + private ConcurrentNavigableMap getTableLocations(final TableName tableName) { // find the map of cached locations for this table - ConcurrentSkipListMap result; + ConcurrentNavigableMap result; result = this.cachedRegionLocations.get(tableName); // if tableLocations for this table isn't built yet, make one if (result == null) { - result = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); - ConcurrentSkipListMap old = + result = new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR); + ConcurrentNavigableMap old = this.cachedRegionLocations.putIfAbsent(tableName, result); if (old != null) { return old; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteArrayMap.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteArrayMap.java new file mode 100644 index 0000000..0623356 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteArrayMap.java @@ -0,0 +1,943 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.types; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentNavigableMap; + +/** + * A Map that keeps a sorted array in order to provide the concurrent map interface. + * Keeping a sorted array means that it's much more cache line friendly, making reads faster + * than the tree version. + * + * In order to make concurrent reads and writes safe this does a copy on write. + * There can only be one concurrent write at a time. + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class CopyOnWriteArrayMap extends AbstractMap + implements Map, ConcurrentNavigableMap { + private final Comparator keyComparator; + private volatile ArrayHolder holder; + + public CopyOnWriteArrayMap() { + this(new Comparator() { + @Override + public int compare(K o1, K o2) { + return ((Comparable) o1).compareTo(o2); + } + }); + } + + public CopyOnWriteArrayMap(final Comparator keyComparator) { + this.keyComparator = keyComparator; + this.holder = new ArrayHolder<>(new Comparator>() { + @Override + public int compare(Entry o1, Entry o2) { + return keyComparator.compare(o1.getKey(), o2.getKey()); + } + }); + } + + private CopyOnWriteArrayMap(final Comparator keyComparator, ArrayHolder holder) { + this.keyComparator = keyComparator; + this.holder = holder; + } + + /* + Un synchronized read operations. + + No locking. + No waiting + No copying. + + These should all be FAST. + */ + + @Override + public Comparator comparator() { + return keyComparator; + } + + @Override + public ConcurrentNavigableMap tailMap(K fromKey, boolean inclusive) { + ArrayHolder current = this.holder; + COWEntry newEntry = new COWEntry<>(fromKey, (V) null); + int index = current.find(newEntry); + + if (!inclusive && index >= 0) { + index++; + } else if (index < 0) { + index = -(index + 1); + } + return new CopyOnWriteArrayMap<>(this.keyComparator, + new ArrayHolder<>(current.entries, index, current.endIndex, current.comparator)); + } + + @Override + public ConcurrentNavigableMap tailMap(K fromKey) { + return this.tailMap(fromKey, true); + } + + @Override + public K firstKey() { + ArrayHolder current = this.holder; + if (current.getLength() == 0) { + return null; + } + return current.entries[current.startIndex].getKey(); + } + + @Override + public K lastKey() { + ArrayHolder current = this.holder; + if (current.getLength() == 0) { + return null; + } + return current.entries[current.endIndex - 1].getKey(); + } + + @Override + public Entry lowerEntry(K key) { + ArrayHolder current = this.holder; + if (current.getLength() == 0) { + return null; + } + COWEntry newEntry = new COWEntry<>(key, (V) null); + int index = current.find(newEntry); + + // There's a key exactly equal. + if (index >= 0) { + index -= 1; + } else { + index = -(index + 1) - 1; + } + + if (index < current.startIndex || index >= current.endIndex) { + return null; + } + return current.entries[index]; + } + + @Override + public K lowerKey(K key) { + Map.Entry entry = lowerEntry(key); + if (entry == null) { + return null; + } + return entry.getKey(); + } + + @Override + public Entry floorEntry(K key) { + ArrayHolder current = this.holder; + if (current.getLength() == 0) { + return null; + } + int index = current.find(new COWEntry(key, null)); + if (index < 0) { + index = -(index + 1) - 1; + } + if (index < current.startIndex || index >= current.endIndex) { + return null; + } + + return current.entries[index]; + } + + @Override + public K floorKey(K key) { + Map.Entry entry = floorEntry(key); + if (entry == null) { + return null; + } + return entry.getKey(); + } + + @Override + public Entry ceilingEntry(K key) { + ArrayHolder current = this.holder; + if (current.getLength() == 0) { + return null; + } + int index = current.find(new COWEntry(key, null)); + if (index < 0) { + index = -(index + 1); + } + if (index < current.startIndex || index >= current.endIndex) { + return null; + } + + return current.entries[index]; + } + + @Override + public K ceilingKey(K key) { + Map.Entry entry = ceilingEntry(key); + if (entry == null) { + return null; + } + return entry.getKey(); + } + + @Override + public Entry higherEntry(K key) { + ArrayHolder current = this.holder; + if (current.getLength() == 0) { + return null; + } + COWEntry newEntry = new COWEntry<>(key, (V) null); + int index = current.find(newEntry); + + // There's a key exactly equal. + if (index >= 0) { + index += 1; + } else { + index = -(index + 1); + } + + if (index < current.startIndex || index >= current.endIndex) { + return null; + } + return current.entries[index]; + } + + @Override + public K higherKey(K key) { + Map.Entry entry = higherEntry(key); + if (entry == null) { + return null; + } + return entry.getKey(); + } + + @Override + public Entry firstEntry() { + ArrayHolder current = this.holder; + if (current.getLength() == 0) { + return null; + } + return current.entries[current.startIndex]; + } + + @Override + public Entry lastEntry() { + ArrayHolder current = this.holder; + if (current.getLength() == 0) { + return null; + } + return current.entries[current.endIndex - 1]; + } + + @Override + public int size() { + return holder.getLength(); + } + + @Override + public boolean isEmpty() { + return holder.getLength() == 0; + } + + @Override + public boolean containsKey(Object key) { + ArrayHolder current = this.holder; + COWEntry newEntry = new COWEntry<>((K) key, (V) null); + int index = current.find(newEntry); + return index >= 0; + } + + @Override + public V get(Object key) { + COWEntry newEntry = new COWEntry<>((K) key, (V) null); + ArrayHolder current = this.holder; + int index = current.find(newEntry); + if (index >= 0) { + return current.entries[index].getValue(); + } + return null; + } + + @Override + public NavigableSet keySet() { + return new ArrayKeySet<>(this.holder); + } + + @Override + public Collection values() { + return new ArrayValueCollection<>(this.holder); + } + + @Override + public Set> entrySet() { + return new ArrayEntrySet<>(this.holder); + } + + /* + Synchronized write methods. + + Every method should be synchronized. + Only one modification at a time. + + These will be slow. + */ + + + @Override + public synchronized V put(K key, V value) { + COWEntry newEntry = new COWEntry<>(key, value); + ArrayHolder current = this.holder; + int index = current.find(newEntry); + if (index >= 0) { + this.holder = current.replace(index, newEntry); + return current.entries[index].getValue(); + } else { + this.holder = current.insert(-(index + 1), newEntry); + } + return null; + } + + @Override + public synchronized V remove(Object key) { + COWEntry newEntry = new COWEntry<>((K) key, (V) null); + ArrayHolder current = this.holder; + int index = current.find(newEntry); + if (index >= 0) { + this.holder = current.remove(index); + return current.entries[index].getValue(); + } + return null; + } + + @Override + public synchronized void clear() { + this.holder = new ArrayHolder<>(this.holder.comparator); + } + + @Override + public synchronized V putIfAbsent(K key, V value) { + ArrayHolder current = this.holder; + COWEntry newEntry = new COWEntry<>((K) key, (V) value); + int index = current.find(newEntry); + + if (index < 0) { + this.holder = current.insert(-(index + 1), newEntry); + return value; + } + return current.entries[index].getValue(); + } + + @Override + public synchronized boolean remove(Object key, Object value) { + ArrayHolder current = this.holder; + COWEntry newEntry = new COWEntry<>((K) key, (V) value); + int index = current.find(newEntry); + + if (index >= 0 && current.entries[index].getValue().equals(value)) { + this.holder = current.remove(index); + return true; + } + return false; + } + + @Override + public synchronized boolean replace(K key, V oldValue, V newValue) { + ArrayHolder current = this.holder; + COWEntry newEntry = new COWEntry<>(key, newValue); + int index = current.find(newEntry); + + if (index >= 0 && current.entries[index].getValue().equals(oldValue)) { + this.holder = current.replace(index, newEntry); + return true; + } + return false; + } + + @Override + public synchronized V replace(K key, V value) { + ArrayHolder current = this.holder; + COWEntry newEntry = new COWEntry<>(key, value); + int index = current.find(newEntry); + + if (index >= 0) { + this.holder = current.replace(index, newEntry); + return current.entries[index].getValue(); + } + return null; + } + + @Override + public Entry pollFirstEntry() { + throw new UnsupportedOperationException(); + } + + @Override + public Entry pollLastEntry() { + throw new UnsupportedOperationException(); + } + + @Override + public ConcurrentNavigableMap descendingMap() { + throw new UnsupportedOperationException(); + } + + @Override + public NavigableSet navigableKeySet() { + throw new UnsupportedOperationException(); + } + + @Override + public ConcurrentNavigableMap subMap(K fromKey, K toKey) { + throw new UnsupportedOperationException(); + } + + @Override + public ConcurrentNavigableMap headMap(K toKey) { + throw new UnsupportedOperationException(); + } + + @Override + public ConcurrentNavigableMap subMap(K fromKey, + boolean fromInclusive, + K toKey, + boolean toInclusive) { + throw new UnsupportedOperationException(); + } + + @Override + public ConcurrentNavigableMap headMap(K toKey, boolean inclusive) { + throw new UnsupportedOperationException(); + } + + @Override + public NavigableSet descendingKeySet() { + throw new UnsupportedOperationException(); + } + + private final class ArrayKeySet implements NavigableSet { + + private final ArrayHolder holder; + + private ArrayKeySet(ArrayHolder holder) { + this.holder = holder; + } + + @Override + public int size() { + return holder.getLength(); + } + + @Override + public boolean isEmpty() { + return holder.getLength() == 0; + } + + @Override + public boolean contains(Object o) { + ArrayHolder current = this.holder; + + for (int i = current.startIndex; i < current.endIndex; i++) { + if (current.entries[i].getValue().equals(o)) { + return true; + } + } + return false; + } + + @Override + public K lower(K k) { + throw new UnsupportedOperationException(); + } + + @Override + public K floor(K k) { + throw new UnsupportedOperationException(); + } + + @Override + public K ceiling(K k) { + throw new UnsupportedOperationException(); + } + + @Override + public K higher(K k) { + throw new UnsupportedOperationException(); + } + + @Override + public K pollFirst() { + throw new UnsupportedOperationException(); + } + + @Override + public K pollLast() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() { + return new ArrayKeyIterator<>(this.holder); + } + + @Override + public NavigableSet descendingSet() { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator descendingIterator() { + throw new UnsupportedOperationException(); + } + + @Override + public NavigableSet subSet(K fromElement, + boolean fromInclusive, + K toElement, + boolean toInclusive) { + throw new UnsupportedOperationException(); + } + + @Override + public NavigableSet headSet(K toElement, boolean inclusive) { + throw new UnsupportedOperationException(); + } + + @Override + public NavigableSet tailSet(K fromElement, boolean inclusive) { + throw new UnsupportedOperationException(); + } + + @Override + public Comparator comparator() { + return (Comparator) keyComparator; + } + + @Override + public SortedSet subSet(K fromElement, K toElement) { + return null; + } + + @Override + public SortedSet headSet(K toElement) { + return null; + } + + @Override + public SortedSet tailSet(K fromElement) { + return null; + } + + @Override + public K first() { + ArrayHolder current = this.holder; + if (current.getLength() == 0) { + return null; + } + return current.entries[current.startIndex].getKey(); + } + + @Override + public K last() { + ArrayHolder current = this.holder; + if (current.getLength() == 0) { + return null; + } + return current.entries[current.endIndex - 1].getKey(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public T[] toArray(T[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(K k) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + } + + private final class ArrayValueCollection implements Collection { + + private final ArrayHolder holder; + + private ArrayValueCollection(ArrayHolder holder) { + this.holder = holder; + } + + @Override + public int size() { + return holder.getLength(); + } + + @Override + public boolean isEmpty() { + return holder.getLength() == 0; + } + + @Override + public boolean contains(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() { + return new ArrayValueIterator<>(this.holder); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public T[] toArray(T[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(V v) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object o) { + return false; + } + + @Override + public int hashCode() { + return 0; + } + } + + private final class ArrayKeyIterator implements Iterator { + int index; + private final ArrayHolder holder; + + private ArrayKeyIterator(ArrayHolder holder) { + this.holder = holder; + index = holder.startIndex; + } + + + @Override + public boolean hasNext() { + return index < holder.endIndex; + } + + @Override + public K next() { + return holder.entries[index++].getKey(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + } + + private final class ArrayValueIterator implements Iterator { + int index; + private final ArrayHolder holder; + + private ArrayValueIterator(ArrayHolder holder) { + this.holder = holder; + index = holder.startIndex; + } + + + @Override + public boolean hasNext() { + return index < holder.endIndex; + } + + @Override + public V next() { + return holder.entries[index++].getValue(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + } + + private final class ArrayEntryIterator implements Iterator> { + + int index; + private final ArrayHolder holder; + + private ArrayEntryIterator(ArrayHolder holder) { + this.holder = holder; + this.index = holder.startIndex; + } + + @Override + public boolean hasNext() { + return index < holder.endIndex; + } + + @Override + public Entry next() { + return holder.entries[index++]; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove"); + } + } + + private final class ArrayEntrySet implements Set> { + private final ArrayHolder holder; + + private ArrayEntrySet(ArrayHolder holder) { + this.holder = holder; + } + + @Override + public int size() { + return holder.getLength(); + } + + @Override + public boolean isEmpty() { + return holder.getLength() == 0; + } + + @Override + public boolean contains(Object o) { + return false; + } + + @Override + public Iterator> iterator() { + return new ArrayEntryIterator<>(this.holder); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public T[] toArray(T[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(Entry kvEntry) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection> c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + } + + private final static class ArrayHolder { + private final Map.Entry[] entries; + private final int startIndex; + private final int endIndex; + private final Comparator> comparator; + + + int getLength() { + return endIndex - startIndex; + } + + int find(Map.Entry newEntry) { + int index = Arrays.binarySearch(entries, + startIndex, endIndex, + newEntry, comparator); + return index; + } + + ArrayHolder replace(int index, Map.Entry newEntry) { + // TODO should this restart the array back at start index 0 ? + Map.Entry[] newEntries = entries.clone(); + newEntries[index] = newEntry; + return new ArrayHolder<>(newEntries, startIndex, endIndex, comparator); + } + + ArrayHolder remove(int index) { + Map.Entry[] newEntries = new Map.Entry[getLength() - 1]; + System.arraycopy(this.entries, startIndex, newEntries, 0, index - startIndex); + System.arraycopy(this.entries, index + 1, newEntries, index, entries.length - index - 1); + return new ArrayHolder<>(newEntries, 0, newEntries.length, comparator); + } + + ArrayHolder insert(int index, Map.Entry newEntry) { + Map.Entry[] newEntries = new Map.Entry[getLength() + 1]; + System.arraycopy(this.entries, startIndex, newEntries, 0, index - startIndex); + newEntries[index] = newEntry; + System.arraycopy(this.entries, index, newEntries, index + 1, getLength() - index); + return new ArrayHolder<>(newEntries, 0, newEntries.length, comparator); + } + + private ArrayHolder(final Comparator> comparator) { + this.endIndex = 0; + this.startIndex = 0; + this.entries = new Map.Entry[] {}; + this.comparator = comparator; + } + + private ArrayHolder(Map.Entry[] entries, + int startIndex, int endIndex, + Comparator> comparator) { + this.entries = entries; + this.startIndex = startIndex; + this.endIndex = endIndex; + this.comparator = comparator; + } + } + + private final static class COWEntry implements Map.Entry { + private K key = null; + private V value = null; + + COWEntry() { + this(null, null); + } + + COWEntry(K key, V value) { + this.key = key; + this.value = value; + } + + @Override + public K getKey() { + return key; + } + + @Override + public V getValue() { + return value; + } + + @Override + public V setValue(V value) { + V oldValue = this.value; + this.value = value; + return oldValue; + } + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteTreeMap.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteTreeMap.java new file mode 100644 index 0000000..5070d76 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteTreeMap.java @@ -0,0 +1,389 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.types; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +import java.util.AbstractMap; +import java.util.Collection; +import java.util.Comparator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentNavigableMap; + + +/** + * Copy on Write concurrent version of TreeMap + * + * This is thread safe version of a tree map that is optimized for reads. Anything that mutates this + * map will be more expensive than ConcurrentSkiplistMap. However reads are several orders of + * magnitude faster. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class CopyOnWriteTreeMap extends AbstractMap + implements Map, ConcurrentNavigableMap, TailMapable { + + /** + * Storage for the latest published version of the map. + * + * There can be other changes in flight and this will point to a stale version. + * However the map that's being pointed to should never be mutated. + */ + volatile NavigableMap map = null; + + public CopyOnWriteTreeMap() { + this.map = new TreeMap<>(); + } + + public CopyOnWriteTreeMap(Comparator comparator) { + map = new TreeMap<>(comparator); + } + + private CopyOnWriteTreeMap(SortedMap subMap) { + map = new TreeMap<>(subMap); + } + + /* + * Slow but supported operations + */ + + + /** + * This has to do a full copy. It will be slow. SLOW! + * {@inheritDoc} + * + * @param fromKey {@inheritDoc} + * @param fromInclusive {@inheritDoc} + * @param toKey {@inheritDoc} + * @param toInclusive {@inheritDoc} + * @return {@inheritDoc} + */ + @Override + public ConcurrentNavigableMap subMap(K fromKey, + boolean fromInclusive, + K toKey, + boolean toInclusive) { + return new CopyOnWriteTreeMap<>(map.subMap(fromKey, fromInclusive, toKey, toInclusive)); + } + + /** + * This has to do a full copy. It will be slow. SLOW! + * {@inheritDoc} + * + * @param toKey {@inheritDoc} + * @param inclusive {@inheritDoc} + * @return {@inheritDoc} + */ + @Override + public ConcurrentNavigableMap headMap(K toKey, boolean inclusive) { + return new CopyOnWriteTreeMap<>(map.headMap(toKey, inclusive)); + } + + /** + * This has to do a full copy. It will be slow. SLOW! + * {@inheritDoc} + * + * @param fromKey {@inheritDoc} + * @param inclusive {@inheritDoc} + * @return {@inheritDoc} + */ + @Override + public ConcurrentNavigableMap tailMap(K fromKey, boolean inclusive) { + return new CopyOnWriteTreeMap<>(map.tailMap(fromKey, inclusive)); + } + + /** + * This will be a fast version of tailMap. It returns a non-concurrent version. + * This allows not copying the map. + * + * @param fromKey {@inheritDoc} + * @param inclusive {@inheritDoc} + * @return {@inheritDoc} + */ + @Override + public SortedMap sortedTailMap(K fromKey, boolean inclusive) { + return map.tailMap(fromKey, inclusive); + } + + /** + * This has to do a full copy. It will be slow. SLOW! + * {@inheritDoc} + * + * @param fromKey {@inheritDoc} + * @param toKey {@inheritDoc} + * @return {@inheritDoc} + */ + @Override + public ConcurrentNavigableMap subMap(K fromKey, K toKey) { + return new CopyOnWriteTreeMap<>(map.subMap(fromKey, toKey)); + } + + + /** + * This has to do a full copy. It will be slow. SLOW! + * {@inheritDoc} + * + * @param toKey {@inheritDoc} + * @return {@inheritDoc} + */ + @Override + public ConcurrentNavigableMap headMap(K toKey) { + return new CopyOnWriteTreeMap<>(map.headMap(toKey)); + } + + /** + * This has to do a full copy. It will be slow. SLOW! + * {@inheritDoc} + * + * + * @param fromKey {@inheritDoc} + * @return {@inheritDoc} + */ + @Override + public ConcurrentNavigableMap tailMap(K fromKey) { + return new CopyOnWriteTreeMap<>(map.tailMap(fromKey)); + } + + /* + * Supported Non-modifying operations. + * These should all be fast and un-synchronized. + */ + @Override + public Comparator comparator() { + return map.comparator(); + } + + @Override + public int size() { + return map.size(); + } + + @Override + public boolean isEmpty() { + return map.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return map.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return map.containsValue(value); + } + + @Override + public V get(Object key) { + return map.get(key); + } + + @Override + public K firstKey() { + return map.firstKey(); + } + + @Override + public K lastKey() { + return map.lastKey(); + } + + @Override + public Entry lowerEntry(K key) { + return map.lowerEntry(key); + } + + @Override + public K lowerKey(K key) { + return map.lowerKey(key); + } + + @Override + public Entry floorEntry(K key) { + return map.floorEntry(key); + } + + @Override + public K floorKey(K key) { + return map.floorKey(key); + } + + @Override + public Entry ceilingEntry(K key) { + return map.ceilingEntry(key); + } + + @Override + public K ceilingKey(K key) { + return map.ceilingKey(key); + } + + @Override + public Entry higherEntry(K key) { + return map.higherEntry(key); + } + + @Override + public K higherKey(K key) { + return map.higherKey(key); + } + + @Override + public Entry firstEntry() { + return map.firstEntry(); + } + + @Override + public Entry lastEntry() { + return map.lastEntry(); + } + + @Override + public Entry pollFirstEntry() { + return map.pollFirstEntry(); + } + + @Override + public Entry pollLastEntry() { + return map.pollLastEntry(); + } + + @Override + public ConcurrentNavigableMap descendingMap() { + return new CopyOnWriteTreeMap<>(map.descendingMap()); + } + + @Override + public NavigableSet navigableKeySet() { + return map.navigableKeySet(); + } + + @Override + public NavigableSet keySet() { + return map.navigableKeySet(); + } + + @Override + public NavigableSet descendingKeySet() { + return map.descendingKeySet(); + } + + @Override + public Collection values() { + return map.values(); + } + + @Override + public Set> entrySet() { + return map.entrySet(); + } + + /* + * Supported but modifying operations. + * EVERYTHING that changes map should be synchronized. + */ + + @Override + public synchronized V put(K key, V value) { + TreeMap newMap = new TreeMap<>(map); + V returnValue = newMap.put(key, value); + map = newMap; + return returnValue; + } + + @Override + public synchronized V remove(Object key) { + TreeMap newMap = new TreeMap<>(map); + V returnValue = newMap.remove(key); + map = newMap; + return returnValue; + } + + @Override + public synchronized void putAll(Map m) { + TreeMap newMap = new TreeMap<>(map); + newMap.putAll(m); + map = newMap; + } + + @Override + public synchronized void clear() { + map = new TreeMap<>(comparator()); + } + + @Override + public synchronized V putIfAbsent(K key, V value) { + if (!map.containsKey(key)) { + V result; + TreeMap newMap = new TreeMap<>(map); + result = newMap.put(key, value); + map = newMap; + return result; + } else { + return map.get(key); + } + } + + @Override + public synchronized boolean remove(Object key, Object value) { + V inMap = map.get(key); + if (inMap != null && inMap.equals(value)) { + TreeMap newMap = new TreeMap<>(map); + newMap.remove(key); + map = newMap; + return true; + } else { + return false; + } + } + + @Override + public synchronized boolean replace(K key, V oldValue, V newValue) { + if (oldValue == null || newValue == null) throw new NullPointerException(); + V inMap = map.get(key); + if (inMap != null && inMap.equals(oldValue)) { + TreeMap newMap = new TreeMap<>(map); + newMap.put(key, newValue); + map = newMap; + return true; + } else { + return false; + } + + } + + @Override + public synchronized V replace(K key, V value) { + if (map.containsKey(key)) { + TreeMap newMap = new TreeMap<>(map); + V result = newMap.put(key, value); + map = newMap; + return result; + } else { + return null; + } + } +} \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/types/TailMapable.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/TailMapable.java new file mode 100644 index 0000000..6279cab --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/TailMapable.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.types; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +import java.util.SortedMap; + + +/** + * interface to show that there's a cheap version of tail map available that doesn't copy. + */ +@InterfaceAudience.Private +public interface TailMapable { + SortedMap sortedTailMap(K fromKey, boolean inclusive); +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/types/TestCopyOnWriteMaps.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/types/TestCopyOnWriteMaps.java new file mode 100644 index 0000000..5d39813 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/types/TestCopyOnWriteMaps.java @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.types; + +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.*; + +@Category({MiscTests.class, MediumTests.class}) +@RunWith(Parameterized.class) +public class TestCopyOnWriteMaps { + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + { new CopyOnWriteArrayMap(), },{ new CopyOnWriteTreeMap(), } + }); + } + + + private static final int MAX_RAND = 10 * 1000 * 1000; + private ConcurrentNavigableMap m; + private ConcurrentSkipListMap csm; + + public TestCopyOnWriteMaps(ConcurrentNavigableMap map) { + this.m = map; + csm = new ConcurrentSkipListMap<>(); + } + + @Before + public void setUp() { + m.clear(); + csm.clear(); + for ( long i = 0 ; i < 10000; i++ ) { + long o = ThreadLocalRandom.current().nextLong(MAX_RAND); + m.put(i, o); + csm.put(i,o); + } + long o = ThreadLocalRandom.current().nextLong(MAX_RAND); + m.put(0L, o); + csm.put(0L,o); + } + + @Test + public void testSize() throws Exception { + assertEquals("Size should always be equal", m.size(), csm.size()); + } + + @Test + public void testIsEmpty() throws Exception { + m.clear(); + assertTrue(m.isEmpty()); + m.put(100L, 100L); + assertFalse(m.isEmpty()); + m.remove(100L); + assertTrue(m.isEmpty()); + } + + @Test + public void testLowerKey() throws Exception { + + assertEquals(csm.lowerKey(400L), m.lowerKey(400L)); + assertEquals(csm.lowerKey(-1L), m.lowerKey(-1L)); + + for ( int i =0 ; i < 100; i ++) { + Long key = ThreadLocalRandom.current().nextLong(); + assertEquals(csm.lowerKey(key), m.lowerKey(key)); + } + } + + @Test + public void testFloorEntry() throws Exception { + for ( int i =0 ; i < 100; i ++) { + Long key = ThreadLocalRandom.current().nextLong(); + assertEquals(csm.floorEntry(key), m.floorEntry(key)); + } + } + + @Test + public void testFloorKey() throws Exception { + for ( int i =0 ; i < 100; i ++) { + Long key = ThreadLocalRandom.current().nextLong(); + assertEquals(csm.floorKey(key), m.floorKey(key)); + } + } + + @Test + public void testCeilingKey() throws Exception { + + assertEquals(csm.ceilingKey(4000L), m.ceilingKey(4000L)); + assertEquals(csm.ceilingKey(400L), m.ceilingKey(400L)); + assertEquals(csm.ceilingKey(-1L), m.ceilingKey(-1L)); + + for ( int i =0 ; i < 100; i ++) { + Long key = ThreadLocalRandom.current().nextLong(); + assertEquals(csm.ceilingKey(key), m.ceilingKey(key)); + } + } + + @Test + public void testHigherKey() throws Exception { + + assertEquals(csm.higherKey(4000L), m.higherKey(4000L)); + assertEquals(csm.higherKey(400L), m.higherKey(400L)); + assertEquals(csm.higherKey(-1L), m.higherKey(-1L)); + + for ( int i =0 ; i < 100; i ++) { + Long key = ThreadLocalRandom.current().nextLong(); + assertEquals(csm.higherKey(key), m.higherKey(key)); + } + } + + @Test + public void testRemove() throws Exception { + for (Map.Entry e:csm.entrySet()) { + assertEquals(csm.remove(e.getKey()), m.remove(e.getKey())); + assertEquals(null, m.remove(e.getKey())); + } + } + + @Test + public void testReplace() throws Exception { + for (Map.Entry e:csm.entrySet()) { + Long newValue = ThreadLocalRandom.current().nextLong(); + assertEquals(csm.replace(e.getKey(), newValue), m.replace(e.getKey(), newValue)); + } + assertEquals(null, m.replace(MAX_RAND + 100L, ThreadLocalRandom.current().nextLong())); + } + + @Test + public void testReplace1() throws Exception { + for (Map.Entry e: csm.entrySet()) { + Long newValue = ThreadLocalRandom.current().nextLong(); + assertEquals(csm.replace(e.getKey(), e.getValue() + 1, newValue), + m.replace(e.getKey(), e.getValue() + 1, newValue)); + assertEquals(csm.replace(e.getKey(), e.getValue(), newValue), + m.replace(e.getKey(), e.getValue(), newValue)); + assertEquals(newValue, m.get(e.getKey())); + assertEquals(csm.get(e.getKey()), m.get(e.getKey())); + } + assertEquals(null, m.replace(MAX_RAND + 100L, ThreadLocalRandom.current().nextLong())); + } + + @Test + public void testMultiAdd() throws InterruptedException { + + Thread[] threads = new Thread[10]; + for ( int i =0 ; i fromCsm = csm.tailMap(50L); + Map fromM = m.tailMap(50L); + assertEquals(fromCsm, fromM); + for (Long value:m.keySet()) { + assertEquals(csm.tailMap(value), m.tailMap(value)); + } + + for ( long i = 0 ; i < 100; i++ ) { + long o = ThreadLocalRandom.current().nextLong(MAX_RAND); + assertEquals(csm.tailMap(o), m.tailMap(o)); + } + } + + @Test + public void testTailMapExclusive() throws Exception { + m.clear(); + m.put(100L, 100L); + m.put(101L, 101L); + m.put(101L, 101L); + m.put(103L, 103L); + m.put(99L, 99L); + m.put(102L, 102L); + + long n = 100L; + CopyOnWriteArrayMap tm99 = (CopyOnWriteArrayMap) m.tailMap(99L, false); + for (Map.Entry e:tm99.entrySet()) { + assertEquals(new Long(n), e.getKey()); + assertEquals(new Long(n), e.getValue()); + n++; + } + } + + @Test + public void testTailMapInclusive() throws Exception { + m.clear(); + m.put(100L, 100L); + m.put(101L, 101L); + m.put(101L, 101L); + m.put(103L, 103L); + m.put(99L, 99L); + m.put(102L, 102L); + + long n = 102; + CopyOnWriteArrayMap tm102 = (CopyOnWriteArrayMap) m.tailMap(102L, true); + for (Map.Entry e:tm102.entrySet()) { + assertEquals(new Long(n), e.getKey()); + assertEquals(new Long(n), e.getValue()); + n++; + } + n = 99; + CopyOnWriteArrayMap tm98 = (CopyOnWriteArrayMap) m.tailMap(98L, true); + for (Map.Entry e:tm98.entrySet()) { + assertEquals(new Long(n), e.getKey()); + assertEquals(new Long(n), e.getValue()); + n++; + } + } + + @Test + public void testPut() throws Exception { + m.clear(); + m.put(100L, 100L); + m.put(101L, 101L); + m.put(101L, 101L); + m.put(103L, 103L); + m.put(99L, 99L); + m.put(102L, 102L); + long n = 99; + + for (Map.Entry e:m.entrySet()) { + assertEquals(new Long(n), e.getKey()); + assertEquals(new Long(n), e.getValue()); + n++; + } + assertEquals(5, m.size()); + assertEquals(false, m.isEmpty()); + } +} \ No newline at end of file -- 2.6.1