diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 489bccc7c9..c7b39cd258 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -28,6 +28,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -212,6 +213,7 @@ class ConnectionManager { public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server"; private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled"; private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; + private static final String CLIENT_META_CACHE_OPTIMIZE = "hbase.client.meta.cache.optimize"; // An LRU Map of HConnectionKey -> HConnection (TableServer). All // access must be synchronized. This map is not private because tests @@ -722,7 +724,9 @@ class ConnectionManager { } this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true); - this.metaCache = new MetaCache(this.metrics); + + Map optimizedTable = conf.getPropsWithPrefix(CLIENT_META_CACHE_OPTIMIZE); + this.metaCache = new MetaCache(this.metrics, optimizedTable); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java index c9f5e029b6..bb61f959ac 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.client; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -35,8 +37,10 @@ import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.types.ByteCopyOnWriteArrayMap; import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; /** * A cache implementation for region locations from meta. @@ -46,6 +50,44 @@ public class MetaCache { private static final Log LOG = LogFactory.getLog(MetaCache.class); + private static class OptimizedTable { + private static String LENGTH_DELM = ","; + + private TableName tableName; + private int prefixLength; + private int splitLength; + + public OptimizedTable(TableName tableName, int prefixLength, int splitLength) { + this.prefixLength = prefixLength; + this.splitLength = splitLength; + this.tableName = tableName; + } + + public int getPrefixLength() { + return prefixLength; + } + + public void setPrefixLength(int prefixLength) { + this.prefixLength = prefixLength; + } + + public int getSplitLength() { + return splitLength; + } + + public void setSplitLength(int splitLength) { + this.splitLength = splitLength; + } + + public TableName getTableName() { + return tableName; + } + + public void setTableName(TableName tableName) { + this.tableName = tableName; + } + } + /** * Map of table to table {@link HRegionLocation}s. */ @@ -61,9 +103,32 @@ public class MetaCache { private final Set cachedServers = new CopyOnWriteArraySet<>(); private final MetricsConnection metrics; + private Map optimizedTables; - public MetaCache(MetricsConnection metrics) { + public MetaCache(MetricsConnection metrics, Map optimizedTable) { this.metrics = metrics; + this.optimizedTables = parseOptimizedTable(optimizedTable); + } + + private Map parseOptimizedTable(Map optimizedTable) { + Map optimizedTableMap = new HashMap<>(); + for(Entry entry : optimizedTable.entrySet()) { + TableName tableName = TableName.valueOf(entry.getKey()); + + int prefixLength = 0; + int splitLength = 1; + String[] lengthStr = entry.getValue().trim().split(OptimizedTable.LENGTH_DELM); + assert lengthStr.length >0 && lengthStr.length <= 2; + if(lengthStr.length == 1) { + splitLength = Integer.valueOf(lengthStr[0]); + }else { + prefixLength = Integer.valueOf(lengthStr[0]); + splitLength = Integer.valueOf(lengthStr[1]); + } + optimizedTableMap.put(tableName.getNameAsString(), + new OptimizedTable(tableName, prefixLength, splitLength)); + } + return optimizedTableMap; } /** @@ -196,7 +261,13 @@ public class MetaCache { result = this.cachedRegionLocations.get(tableName); // if tableLocations for this table isn't built yet, make one if (result == null) { - result = new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR); + if(optimizedTables.containsKey(tableName.getNameAsString())) { + OptimizedTable optimizedTable = optimizedTables.get(tableName.getNameAsString()); + result = new ByteCopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR, + optimizedTable.getPrefixLength(), optimizedTable.getSplitLength()); + }else { + result = new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR); + } ConcurrentNavigableMap old = this.cachedRegionLocations.putIfAbsent(tableName, result); if (old != null) { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/types/ByteCopyOnWriteArrayMap.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/ByteCopyOnWriteArrayMap.java new file mode 100644 index 0000000000..63a9d34350 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/ByteCopyOnWriteArrayMap.java @@ -0,0 +1,156 @@ +package org.apache.hadoop.hbase.types; + + +import java.util.Comparator; +import java.util.Map; + +public class ByteCopyOnWriteArrayMap extends CopyOnWriteArrayMap { + + public ByteCopyOnWriteArrayMap() { + super(); + } + + public ByteCopyOnWriteArrayMap(final Comparator keyComparator) { + super(keyComparator); + + this.holder = new ByteArrayHolder<>(keyComparator, new Comparator>() { + @Override + public int compare(Entry o1, Entry o2) { + return keyComparator.compare(o1.getKey(), o2.getKey()); + } + }); + } + + public ByteCopyOnWriteArrayMap(final Comparator keyComparator, int prefixlength, int splitLength) { + super(keyComparator); + + this.holder = new ByteArrayHolder<>(prefixlength, splitLength, keyComparator, new Comparator>() { + @Override + public int compare(Entry o1, Entry o2) { + return keyComparator.compare(o1.getKey(), o2.getKey()); + } + }); + } + + public final static class ByteArrayHolder extends ArrayHolder { + private int prefixLength; + private int splitLength; + private int[] lowPos; + private int[] highPos; + + private ByteArrayHolder( + final Comparator keyComparator, + final Comparator> comparator) { + super(keyComparator, comparator); + this.prefixLength = 0; + this.splitLength = 1; + initPos(); + } + + private ByteArrayHolder(int prefixLength, int splitLength, + final Comparator keyComparator, + final Comparator> comparator) { + super(keyComparator, comparator); + this.prefixLength = prefixLength; + this.splitLength = splitLength; + initPos(); + } + + private ByteArrayHolder(COWEntry[] entries, + int startIndex, int endIndex, + final Comparator keyComparator, + Comparator> comparator) { + super(entries, startIndex, endIndex, keyComparator, comparator); + this.prefixLength = 0; + this.splitLength = 1; + initPos(); + } + + private ByteArrayHolder(int prefixLength, int splitLength, COWEntry[] entries, + int startIndex, int endIndex, + final Comparator keyComparator, + Comparator> comparator) { + super(entries, startIndex, endIndex, keyComparator, comparator); + this.prefixLength = prefixLength; + this.splitLength = splitLength; + assert this.splitLength > 0 && this.splitLength < 3; + + initPos(); + } + + @Override + public ArrayHolder newArrayHolder(final Comparator keyComparator, + Comparator> comparator) { + return new ByteArrayHolder(prefixLength, splitLength, keyComparator, comparator); + } + + @Override + public ArrayHolder newArrayHolder(COWEntry[] entries, + int startIndex, int endIndex, + final Comparator keyComparator, + Comparator> comparator) { + ByteArrayHolder byteArrayHolder = new ByteArrayHolder(prefixLength, splitLength, + entries, startIndex, endIndex, keyComparator, comparator); + return byteArrayHolder; + } + + int calcSplit(byte[] key) { + int split = 0; + int i = prefixLength, end = prefixLength + splitLength; + while(i < end){ + split <<= 8; + if(i < key.length) { + split += (key[i] & 0xff); + } + i +=1; + } + return split; + } + + private void initPos() { + assert this.prefixLength >= 0 && splitLength >0 && splitLength < 3 + : "Error: prefixLength should be positive and splitLength should be 1 or 2"; + + if(entries.length > 4) { + int splitBufferSize = 1 << (this.splitLength * 8); + lowPos = new int[splitBufferSize]; + highPos = new int[splitBufferSize]; + + int lastSplit = 0; + int low = startIndex, high = startIndex; + for (int i = startIndex; i < endIndex; i++) { + int split = calcSplit(entries[i].key); + if (split == lastSplit) { + high = i; + } else { + for (int j = lastSplit; j < split; j++) { + lowPos[j] = low; + highPos[j] = high + 1; + } + + high = low = i; + lastSplit = split; + } + } + + while(lastSplit < splitBufferSize) { + lowPos[lastSplit] = low; + highPos[lastSplit] = high + 1; + lastSplit ++; + } + } + } + + @Override + int find(byte[] key) { + if(lowPos == null) { + return super.find(key); + }else { + int split = calcSplit(key); + int start = lowPos[split]; + int end = highPos[split]; + return find(key, start, end); + } + } + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteArrayMap.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteArrayMap.java index 8de39aec7c..2665861c2e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteArrayMap.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/CopyOnWriteArrayMap.java @@ -44,8 +44,8 @@ import java.util.concurrent.ConcurrentNavigableMap; @InterfaceStability.Stable public class CopyOnWriteArrayMap extends AbstractMap implements Map, ConcurrentNavigableMap { - private final Comparator keyComparator; - private volatile ArrayHolder holder; + protected final Comparator keyComparator; + protected volatile ArrayHolder holder; public CopyOnWriteArrayMap() { this(new Comparator() { @@ -341,7 +341,7 @@ public class CopyOnWriteArrayMap extends AbstractMap @Override public synchronized void clear() { - this.holder = new ArrayHolder<>(this.holder.keyComparator, this.holder.comparator); + this.holder = holder.newArrayHolder(this.holder.keyComparator, this.holder.comparator); } @Override @@ -862,10 +862,10 @@ public class CopyOnWriteArrayMap extends AbstractMap } } - private final static class ArrayHolder { - private final COWEntry[] entries; - private final int startIndex; - private final int endIndex; + protected static class ArrayHolder { + protected final COWEntry[] entries; + protected final int startIndex; + protected final int endIndex; private final Comparator keyComparator; private final Comparator> comparator; @@ -877,7 +877,7 @@ public class CopyOnWriteArrayMap extends AbstractMap /** * Binary search for a given key - * @param needle The key to look for in all of the entries + * @param needle The key to loobyte[] for in all of the entries * @return Same return value as Arrays.binarySearch. * Positive numbers mean the index. * Otherwise (-1 * insertion point) - 1 @@ -885,6 +885,11 @@ public class CopyOnWriteArrayMap extends AbstractMap int find(K needle) { int begin = startIndex; int end = endIndex - 1; + return find(needle, begin, end); + } + + int find(K needle, int begin, int end) { + end = end - 1; while (begin <= end) { int mid = begin + ((end - begin) / 2); @@ -897,11 +902,11 @@ public class CopyOnWriteArrayMap extends AbstractMap return mid; } else if (compareRes < 0) { // midKey is less than needle so we need - // to look at farther up + // to loobyte[] at farther up begin = mid + 1; } else { // midKey is greater than needle so we - // need to look down. + // need to loobyte[] down. end = mid - 1; } } @@ -913,14 +918,14 @@ public class CopyOnWriteArrayMap extends AbstractMap // TODO should this restart the array back at start index 0 ? COWEntry[] newEntries = entries.clone(); newEntries[index] = newEntry; - return new ArrayHolder<>(newEntries, startIndex, endIndex, keyComparator, comparator); + return newArrayHolder(newEntries, startIndex, endIndex, keyComparator, comparator); } ArrayHolder remove(int index) { COWEntry[] newEntries = new COWEntry[getLength() - 1]; System.arraycopy(this.entries, startIndex, newEntries, 0, index - startIndex); System.arraycopy(this.entries, index + 1, newEntries, index, entries.length - index - 1); - return new ArrayHolder<>(newEntries, 0, newEntries.length, keyComparator, comparator); + return newArrayHolder(newEntries, 0, newEntries.length, keyComparator, comparator); } ArrayHolder insert(int index, COWEntry newEntry) { @@ -928,10 +933,10 @@ public class CopyOnWriteArrayMap extends AbstractMap System.arraycopy(this.entries, startIndex, newEntries, 0, index - startIndex); newEntries[index] = newEntry; System.arraycopy(this.entries, index, newEntries, index + 1, getLength() - index); - return new ArrayHolder<>(newEntries, 0, newEntries.length, keyComparator, comparator); + return newArrayHolder(newEntries, 0, newEntries.length, keyComparator, comparator); } - private ArrayHolder( + protected ArrayHolder( final Comparator keyComparator, final Comparator> comparator) { this.endIndex = 0; @@ -941,7 +946,7 @@ public class CopyOnWriteArrayMap extends AbstractMap this.comparator = comparator; } - private ArrayHolder(COWEntry[] entries, + protected ArrayHolder(COWEntry[] entries, int startIndex, int endIndex, final Comparator keyComparator, Comparator> comparator) { @@ -951,9 +956,21 @@ public class CopyOnWriteArrayMap extends AbstractMap this.keyComparator = keyComparator; this.comparator = comparator; } + + public ArrayHolder newArrayHolder(final Comparator keyComparator, + Comparator> comparator) { + return new ArrayHolder(keyComparator, comparator); + } + + public ArrayHolder newArrayHolder(COWEntry[] entries, + int startIndex, int endIndex, + final Comparator keyComparator, + Comparator> comparator) { + return new ArrayHolder(entries, startIndex, endIndex, keyComparator, comparator); + } } - private final static class COWEntry implements Map.Entry { + protected final static class COWEntry implements Map.Entry { K key = null; V value = null; diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/types/TestByteCopyOnWriteMaps.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/types/TestByteCopyOnWriteMaps.java new file mode 100644 index 0000000000..28e0843d1a --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/types/TestByteCopyOnWriteMaps.java @@ -0,0 +1,327 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.types; + +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@Category(SmallTests.class) +public class TestByteCopyOnWriteMaps { + + private ConcurrentNavigableMap m; + private ConcurrentNavigableMap csm; + private int count; + + private String buildKey(long i) { + return "user" + String.format("%4d", i); + } + @Before + public void setUp() { + m = new ByteCopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR, 4, 2); + csm = new CopyOnWriteArrayMap<>(Bytes.BYTES_COMPARATOR); + + count = 1024; + for(long i=0;i e:csm.entrySet()) { + assertEquals(csm.remove(e.getKey()), m.remove(e.getKey())); + assertEquals(null, m.remove(e.getKey())); + } + } + + + @Test + public void testMultiAdd() throws InterruptedException { + + Thread[] threads = new Thread[10]; + for ( int i =0 ; i tm99 = (CopyOnWriteArrayMap) m.tailMap(Bytes.toBytes(99L), false); + for (Map.Entry e:tm99.entrySet()) { + assertEquals(0, Bytes.compareTo(Bytes.toBytes(n), e.getKey())); + assertEquals(new Long(n), e.getValue()); + n++; + } + } + + @Test + public void testTailMapInclusive() throws Exception { + m.clear(); + m.put(Bytes.toBytes(100L), 100L); + m.put(Bytes.toBytes(101L), 101L); + m.put(Bytes.toBytes(101L), 101L); + m.put(Bytes.toBytes(103L), 103L); + m.put(Bytes.toBytes(99L), 99L); + m.put(Bytes.toBytes(102L), 102L); + + long n = 102; + CopyOnWriteArrayMap tm102 = (CopyOnWriteArrayMap) m.tailMap(Bytes.toBytes(102L), true); + for (Map.Entry e:tm102.entrySet()) { + assertEquals(0, Bytes.compareTo(Bytes.toBytes(n), e.getKey())); + assertEquals(new Long(n), e.getValue()); + n++; + } + n = 99; + CopyOnWriteArrayMap tm98 = (CopyOnWriteArrayMap) m.tailMap(Bytes.toBytes(98L), true); + for (Map.Entry e:tm98.entrySet()) { + assertEquals(0, Bytes.compareTo(Bytes.toBytes(n), e.getKey())); + assertEquals(new Long(n), e.getValue()); + n++; + } + } + + @Test + public void testPut() throws Exception { + m.clear(); + m.put(Bytes.toBytes(100L), 100L); + m.put(Bytes.toBytes(101L), 101L); + m.put(Bytes.toBytes(101L), 101L); + m.put(Bytes.toBytes(103L), 103L); + m.put(Bytes.toBytes(99L), 99L); + m.put(Bytes.toBytes(102L), 102L); + long n = 99; + + for (Map.Entry e:m.entrySet()) { + assertEquals(0, Bytes.compareTo(Bytes.toBytes(n), e.getKey())); + assertEquals(new Long(n), e.getValue()); + n++; + } + assertEquals(5, m.size()); + assertEquals(false, m.isEmpty()); + } + + + @Test + public void testLowerKeyPerformance() throws Exception { + int count = 10000000; + Random random = new Random(); + ArrayList keys = new ArrayList<>(); + for ( int i =0; i < count; i ++) { + keys.add(buildKey(random.nextInt(10000)).getBytes()); + } + + ArrayList csmResult = new ArrayList<>(); + ArrayList mResult = new ArrayList<>(); + long time = 0; + + //preheat + for ( int i =0; i < count; i ++) { + byte[] key = keys.get(i); + byte[] lowerKeyCsm = m.lowerKey(key); + mResult.add(lowerKeyCsm); + } + time = System.currentTimeMillis(); + for ( int i =0; i < count; i ++) { + byte[] key = keys.get(i); + m.lowerKey(key); + } + time = System.currentTimeMillis() - time; + System.out.println("BCM耗时:" + time + "ms"); + + //preheat + for ( int i =0; i < count; i ++) { + byte[] key = keys.get(i); + byte[] lowerKeyCsm = csm.lowerKey(key); + csmResult.add(lowerKeyCsm); + } + + time = System.currentTimeMillis(); + for ( int i =0; i < count; i ++) { + byte[] key = keys.get(i); + csm.lowerKey(key); + } + time = System.currentTimeMillis() - time; + System.out.println("CM耗时:" + time + "ms"); + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index d9ff7eb99d..4458fe4be1 100644 --- a/pom.xml +++ b/pom.xml @@ -1253,7 +1253,7 @@ 3.0.3 ${compileSource} - 2.5.1 + 2.8.5 ${hadoop-two.version} 3.0.0-SNAPSHOT