commit 37f99e33f38ca463d864c808a99c9e6325b9e451 Author: Karthick Sankarachary Date: Sat Aug 28 12:52:34 2010 -0700 HBASE-2939 Improving Client-Side Performance Of HBase diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java index bfaa4a1..6fa5a3f 100644 --- a/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -138,6 +138,12 @@ public final class HConstants { /** Parameter name for HBase instance root directory */ public static final String HBASE_DIR = "hbase.rootdir"; + /** Parameter name for HBase client IPC pool type */ + public static final String HBASE_CLIENT_IPC_POOL_TYPE = "hbase.client.ipc.pool.type"; + + /** Parameter name for HBase client IPC pool size */ + public static final String HBASE_CLIENT_IPC_POOL_SIZE = "hbase.client.ipc.pool.size"; + /** Used to construct the name of the log directory for a region server * Use '.' as a special character to seperate the log files from table data */ public static final String HREGION_LOGDIR_NAME = ".logs"; diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index 2b5eeb6..ea0a75b 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -23,6 +23,9 @@ package org.apache.hadoop.hbase.ipc; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.SharedMap; +import org.apache.hadoop.hbase.util.SharedMap.PoolType; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.ObjectWritable; @@ -48,6 +51,7 @@ import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.util.Hashtable; import java.util.Iterator; +import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -65,8 +69,7 @@ public class HBaseClient { private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient"); - protected final Hashtable connections = - new Hashtable(); + protected final Map connections; protected final Class valueClass; // class of call values protected int counter; // counter for call ids @@ -659,6 +662,7 @@ public class HBaseClient { } this.conf = conf; this.socketFactory = factory; + this.connections = new SharedMap(getPoolType(conf), getPoolSize(conf)); } /** @@ -670,6 +674,31 @@ public class HBaseClient { this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf)); } + /** + * Return the pool type specified in the configuration, if it roughly equals either + * the name of {@link PoolType#Reusable} or {@link PoolType#ThreadLocal}, otherwise + * default to the former type. + * + * @param config configuration + * @return either a {@link PoolType#Reusable} or {@link PoolType#ThreadLocal} + */ + private static PoolType getPoolType(Configuration config) { + return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), + PoolType.RoundRobin, PoolType.ThreadLocal); + } + + /** + * Return the pool size specified in the configuration, otherwise the maximum allowable + * size (which for all intents and purposes represents an unbounded pool). + * + * @param config + * @return the maximum pool size + */ + private static int getPoolSize(Configuration config) { + return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, + Integer.MAX_VALUE); + } + /** Return the socket factory of this client * * @return this client's socket factory diff --git a/src/main/java/org/apache/hadoop/hbase/util/ReusablePool.java b/src/main/java/org/apache/hadoop/hbase/util/ReusablePool.java new file mode 100644 index 0000000..759e3b9 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/ReusablePool.java @@ -0,0 +1,72 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Collection; +import java.util.LinkedList; + +import org.apache.hadoop.hbase.util.SharedMap.Pool; + +/** + * The ReusablePool represents a {@link SharedMap.Pool} that builds + * on the {@link LinkedList} class. It essentially allows resources to be + * checked out, at which point it is removed from this pool. When the resource + * is no longer required, it should be returned to the pool in order to be + * reused. + * + *

+ * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of the + * pool is unbounded. Otherwise, it caps the number of consumers that can check + * out a resource from this pool to the (non-zero positive) value specified in + * {@link #maxSize}. + *

+ * + * @author Karthick Sankarachary + * + * @param + * the type of the resource + */ +@SuppressWarnings("serial") +public class ReusablePool extends LinkedList implements Pool { + private int maxSize; + + public ReusablePool(int maxSize) { + this.maxSize = maxSize; + + } + + @Override + public R get() { + return poll(); + } + + @Override + public R put(R resource) { + if (size() < maxSize) { + add(resource); + } + return null; + } + + @Override + public Collection values() { + return this; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/util/RoundRobinPool.java b/src/main/java/org/apache/hadoop/hbase/util/RoundRobinPool.java new file mode 100644 index 0000000..0354559 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/RoundRobinPool.java @@ -0,0 +1,79 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.hadoop.hbase.util.SharedMap.Pool; + +/** + * The RoundRobinPool represents a {@link SharedMap.Pool}, which + * stores its resources in an {@link ArrayList}. It load-balances access to its + * resources by returning a different resource every time a given key is looked + * up. + * + *

+ * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of the + * pool is unbounded. Otherwise, it caps the number of resources in this pool to + * the (non-zero positive) value specified in {@link #maxSize}. + *

+ * + * @author Karthick Sankarachary + * + * @param + * the type of the resource + * + */ +@SuppressWarnings("serial") +class RoundRobinPool extends ArrayList implements Pool { + private int maxSize; + private int nextResource = 0; + + public RoundRobinPool(int maxSize) { + this.maxSize = maxSize; + } + + @Override + public R put(R resource) { + if (size() > maxSize) { + return null; + } + R previousResource = get(); + this.add(resource); + return previousResource; + } + + @Override + public R get() { + if (size() < maxSize) { + return null; + } + nextResource %= size(); + R resource = get(nextResource++); + return resource; + } + + @Override + public Collection values() { + return this; + } + +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java b/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java new file mode 100644 index 0000000..315e8ef --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java @@ -0,0 +1,253 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * + * The SharedMap maps a key to a collection of values, the elements + * of which are managed by a pool. In effect, that collection acts as a shared + * pool of resources, access to which is closely controlled as per the semantics + * of the pool. + * + *

+ * In case the size of the pool is set to a non-zero positive number, that is + * used to cap the number of resources that a pool may contain for any given + * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool. + *

+ * + * @param + * the type of the key to the resource + * @param + * the type of the resource being pooled + * + * @author Karthick Sankarachary + */ +public class SharedMap implements Map { + private PoolType poolType; + + private int poolMaxSize; + + private Map> pools = Collections + .synchronizedMap(new HashMap>()); + + public SharedMap(PoolType poolType, int poolMaxSize) { + this.poolType = poolType; + this.poolMaxSize = poolMaxSize; + } + + @Override + public V get(Object key) { + Pool pool = pools.get(key); + return pool != null ? pool.get() : null; + } + + @Override + public V put(K key, V value) { + Pool pool = pools.get(key); + if (pool == null) { + pools.put(key, pool = createPool()); + } + return pool != null ? pool.put(value) : null; + } + + @Override + public V remove(Object key) { + Pool pool = pools.remove(key); + if (pool != null) { + pool.clear(); + } + return null; + } + + public boolean remove(K key, V value) { + Pool pool = pools.get(key); + return pool != null ? pool.remove(value) : false; + } + + @Override + public Collection values() { + Collection values = new ArrayList(); + for (Pool pool : pools.values()) { + Collection poolValues = pool.values(); + if (poolValues != null) { + values.addAll(poolValues); + } + } + return values; + } + + @Override + public boolean isEmpty() { + return pools.isEmpty(); + } + + @Override + public int size() { + return pools.size(); + } + + public int size(K key) { + Pool pool = pools.get(key); + return pool != null ? pool.size() : 0; + } + + @Override + public boolean containsKey(Object key) { + return pools.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + if (value == null) { + return false; + } + for (Pool pool : pools.values()) { + if (value.equals(pool.get())) { + return true; + } + } + return false; + } + + @Override + public void putAll(Map map) { + for (Map.Entry entry : map.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + + @Override + public void clear() { + for (Pool pool : pools.values()) { + pool.clear(); + } + pools.clear(); + } + + @Override + public Set keySet() { + return pools.keySet(); + } + + @Override + public Set> entrySet() { + Set> entries = new HashSet>(); + for (Map.Entry> poolEntry : pools.entrySet()) { + final K poolKey = poolEntry.getKey(); + final Pool pool = poolEntry.getValue(); + for (final V poolValue : pool.values()) { + if (pool != null) { + entries.add(new Map.Entry() { + @Override + public K getKey() { + return poolKey; + } + + @Override + public V getValue() { + return poolValue; + } + + @Override + public V setValue(V value) { + return pool.put(value); + } + }); + } + } + } + return null; + } + + protected interface Pool { + public R get(); + + public R put(R resource); + + public boolean remove(R resource); + + public void clear(); + + public Collection values(); + + public int size(); + } + + public enum PoolType { + Reusable, ThreadLocal, RoundRobin; + + public static PoolType valueOf(String poolTypeName, + PoolType defaultPoolType, PoolType... allowedPoolTypes) { + PoolType poolType = PoolType.fuzzyMatch(poolTypeName); + if (poolType != null) { + boolean allowedType = false; + if (poolType.equals(defaultPoolType)) { + allowedType = true; + } else { + if (allowedPoolTypes != null) { + for (PoolType allowedPoolType : allowedPoolTypes) { + if (poolType.equals(allowedPoolType)) { + allowedType = true; + break; + } + } + } + } + if (!allowedType) { + poolType = null; + } + } + return (poolType != null) ? poolType : defaultPoolType; + } + + public static String fuzzyNormalize(String name) { + return name != null ? name.replaceAll("-", "").trim().toLowerCase() : ""; + } + + public static PoolType fuzzyMatch(String name) { + for (PoolType poolType : values()) { + if (fuzzyNormalize(name).equals(fuzzyNormalize(poolType.name()))) { + return poolType; + } + } + return null; + } + } + + protected Pool createPool() { + switch (poolType) { + case Reusable: + return new ReusablePool(poolMaxSize); + case RoundRobin: + return new RoundRobinPool(poolMaxSize); + case ThreadLocal: + return new ThreadLocalPool(poolMaxSize); + } + return null; + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalPool.java b/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalPool.java new file mode 100644 index 0000000..07df900 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalPool.java @@ -0,0 +1,112 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.util.SharedMap.Pool; + +/** + * The ThreadLocalPool represents a {@link SharedMap.Pool} that + * builds on the {@link ThreadLocal} class. It essentially binds the resource to + * the thread from which it is accessed. + * + *

+ * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of the + * pool is bounded only by the number of threads that add resources to this + * pool. Otherwise, it caps the number of threads that can set a value on this + * {@link ThreadLocal} instance to the (non-zero positive) value specified in + * {@link #maxSize}. + *

+ * + * + * @author Karthick Sankarachary + * + * @param + * the type of the resource + */ +class ThreadLocalPool extends ThreadLocal implements Pool { + private static Map, AtomicInteger> poolSizes = new HashMap, AtomicInteger>(); + + private int maxSize; + + public ThreadLocalPool(int maxSize) { + this.maxSize = maxSize; + } + + @Override + public R put(R resource) { + R previousResource = get(); + if (previousResource == null) { + AtomicInteger poolSize = poolSizes.get(this); + if (poolSize == null) { + poolSizes.put(this, poolSize = new AtomicInteger(0)); + } + if (poolSize.incrementAndGet() > maxSize) { + return null; + } + } + this.set(resource); + return previousResource; + } + + @Override + public void remove() { + super.remove(); + AtomicInteger poolSize = poolSizes.get(this); + if (poolSize != null) { + poolSize.decrementAndGet(); + } + } + + @Override + public int size() { + AtomicInteger poolSize = poolSizes.get(this); + return poolSize != null ? poolSize.get() : 0; + } + + @Override + public boolean remove(R resource) { + R previousResource = super.get(); + if (resource != null && resource.equals(previousResource)) { + remove(); + return true; + } else { + return false; + } + } + + @Override + public void clear() { + super.remove(); + } + + @Override + public Collection values() { + List values = new ArrayList(); + values.add(get()); + return values; + } +} \ No newline at end of file