commit 56204adfbb14298c40038a41cc1289f973640e99 Author: unknown Date: Fri Aug 27 12:10:31 2010 -0700 HBASE-2938 Add Thread-Local Behavior To HTable Pool diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java index bfaa4a1..71e09af 100644 --- a/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -138,6 +138,12 @@ public final class HConstants { /** Parameter name for HBase instance root directory */ public static final String HBASE_DIR = "hbase.rootdir"; + /** Parameter name for HTable client pool type */ + public static final String HBASE_CLIENT_HTABLE_POOL_TYPE = "hbase.client.htable.pool.type"; + + /** Parameter name for HTable client pool size */ + public static final String HBASE_CLIENT_HTABLE_POOL_SIZE = "hbase.client.htable.pool.size"; + /** Used to construct the name of the log directory for a region server * Use '.' as a special character to seperate the log files from table data */ public static final String HREGION_LOGDIR_NAME = ".logs"; diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java b/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java index 0a7bc4f..8589bb8 100755 --- a/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java @@ -19,15 +19,15 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Collections; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; - -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.Queue; +import org.apache.hadoop.hbase.util.SharedMap; +import org.apache.hadoop.hbase.util.SharedMap.PoolType; /** * A simple pool of HTable instances.

@@ -41,36 +41,99 @@ import java.util.Queue; * is {@link Integer#MAX_VALUE}.

*/ public class HTablePool { - private final Map> tables = - Collections.synchronizedMap(new HashMap>()); + private final Map tables; private final Configuration config; - private final int maxSize; private HTableInterfaceFactory tableFactory = new HTableFactory(); /** - * Default Constructor. Default HBaseConfiguration and no limit on pool size. + * Default Constructor, which uses the default HBaseConfiguration and places + * no limit on the pool size. */ public HTablePool() { - this(HBaseConfiguration.create(), Integer.MAX_VALUE); + this(HBaseConfiguration.create()); } /** - * Constructor to set maximum versions and use the specified configuration. + * Construct a pool using the type and size specified in the given configuration. + * + * @param config configuration + */ + public HTablePool(Configuration config) { + this(config, getPoolType(config)); + } + + /** + * Construct a pool using the given size and the type specified in the given configuration. + * * @param config configuration * @param maxSize maximum number of references to keep for each table */ public HTablePool(Configuration config, int maxSize) { - this.config = config; - this.maxSize = maxSize; + this(config, getPoolType(config), maxSize); + } + + /** + * Construct a pool of the given type and the size specified in the given configuration. + * + * @param config configuration + * @param poolType the type of the pool + */ + public HTablePool(Configuration config, PoolType poolType) { + this(config, poolType, getPoolSize(config)); + } + + /** + * + * @param config + * @param poolType + * @param maxSize + */ + public HTablePool(Configuration config, PoolType poolType, int maxSize) { + this(config, poolType, maxSize, new HTableFactory()); } - public HTablePool(Configuration config, int maxSize, HTableInterfaceFactory tableFactory) { + /** + * + * @param config + * @param poolType + * @param maxSize + * @param tableFactory + */ + public HTablePool(Configuration config, PoolType poolType, int maxSize, + HTableInterfaceFactory tableFactory) { this.config = config; - this.maxSize = maxSize; this.tableFactory = tableFactory; + this.tables = Collections + .synchronizedMap(new SharedMap(poolType, + maxSize)); } /** + * Return the pool type specified in the configuration, if it roughly equals either + * the name of {@link PoolType#Reusable} or {@link PoolType#ThreadLocal}, otherwise + * default to the former type. + * + * @param config configuration + * @return either a {@link PoolType#Reusable} or {@link PoolType#ThreadLocal} + */ + private static PoolType getPoolType(Configuration config) { + return PoolType.valueOf(config.get(HConstants.HBASE_CLIENT_HTABLE_POOL_TYPE), + PoolType.Reusable, PoolType.ThreadLocal); + } + + /** + * Return the pool size specified in the configuration, otherwise the maximum allowable + * size (which for all intents and purposes represents an unbounded pool). + * + * @param config + * @return the maximum pool size + */ + private static int getPoolSize(Configuration config) { + return config.getInt(HConstants.HBASE_CLIENT_HTABLE_POOL_SIZE, + Integer.MAX_VALUE); + } + + /** * Get a reference to the specified table from the pool.

* * Create a new one if one is not available. @@ -79,20 +142,8 @@ public class HTablePool { * @throws RuntimeException if there is a problem instantiating the HTable */ public HTableInterface getTable(String tableName) { - LinkedList queue = tables.get(tableName); - if(queue == null) { - queue = new LinkedList(); - tables.put(tableName, queue); - return createHTable(tableName); - } - HTableInterface table; - synchronized(queue) { - table = queue.poll(); - } - if(table == null) { - return createHTable(tableName); - } - return table; + HTableInterface table = tables.get(tableName); + return (table != null) ? table : createHTable(tableName); } /** @@ -115,15 +166,12 @@ public class HTablePool { * @param table table */ public void putTable(HTableInterface table) { - LinkedList queue = tables.get(Bytes.toString(table.getTableName())); - synchronized(queue) { - if(queue.size() >= maxSize) return; - queue.add(table); - } + tables.put(Bytes.toString(table.getTableName()), table); } protected HTableInterface createHTable(String tableName) { - return this.tableFactory.createHTableInterface(config, Bytes.toBytes(tableName)); + return this.tableFactory.createHTableInterface(config, + Bytes.toBytes(tableName)); } /** @@ -136,17 +184,12 @@ public class HTablePool { * @param tableName */ public void closeTablePool(final String tableName) { - Queue queue = tables.get(tableName); - synchronized (queue) { - HTableInterface table = queue.poll(); - while (table != null) { - this.tableFactory.releaseHTableInterface(table); - table = queue.poll(); - } + HTableInterface table; + while ((table = tables.get(tableName)) != null) { + this.tableFactory.releaseHTableInterface(table); } - } - + /** * See {@link #closeTablePool(String)}. * @@ -157,9 +200,6 @@ public class HTablePool { } int getCurrentPoolSize(String tableName) { - Queue queue = tables.get(tableName); - synchronized(queue) { - return queue.size(); - } + return ((SharedMap) tables).size(tableName); } } diff --git a/src/main/java/org/apache/hadoop/hbase/util/ReusablePool.java b/src/main/java/org/apache/hadoop/hbase/util/ReusablePool.java new file mode 100644 index 0000000..759e3b9 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/ReusablePool.java @@ -0,0 +1,72 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.Collection; +import java.util.LinkedList; + +import org.apache.hadoop.hbase.util.SharedMap.Pool; + +/** + * The ReusablePool represents a {@link SharedMap.Pool} that builds + * on the {@link LinkedList} class. It essentially allows resources to be + * checked out, at which point it is removed from this pool. When the resource + * is no longer required, it should be returned to the pool in order to be + * reused. + * + *

+ * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of the + * pool is unbounded. Otherwise, it caps the number of consumers that can check + * out a resource from this pool to the (non-zero positive) value specified in + * {@link #maxSize}. + *

+ * + * @author Karthick Sankarachary + * + * @param + * the type of the resource + */ +@SuppressWarnings("serial") +public class ReusablePool extends LinkedList implements Pool { + private int maxSize; + + public ReusablePool(int maxSize) { + this.maxSize = maxSize; + + } + + @Override + public R get() { + return poll(); + } + + @Override + public R put(R resource) { + if (size() < maxSize) { + add(resource); + } + return null; + } + + @Override + public Collection values() { + return this; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/util/RoundRobinPool.java b/src/main/java/org/apache/hadoop/hbase/util/RoundRobinPool.java new file mode 100644 index 0000000..0354559 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/RoundRobinPool.java @@ -0,0 +1,79 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.hadoop.hbase.util.SharedMap.Pool; + +/** + * The RoundRobinPool represents a {@link SharedMap.Pool}, which + * stores its resources in an {@link ArrayList}. It load-balances access to its + * resources by returning a different resource every time a given key is looked + * up. + * + *

+ * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of the + * pool is unbounded. Otherwise, it caps the number of resources in this pool to + * the (non-zero positive) value specified in {@link #maxSize}. + *

+ * + * @author Karthick Sankarachary + * + * @param + * the type of the resource + * + */ +@SuppressWarnings("serial") +class RoundRobinPool extends ArrayList implements Pool { + private int maxSize; + private int nextResource = 0; + + public RoundRobinPool(int maxSize) { + this.maxSize = maxSize; + } + + @Override + public R put(R resource) { + if (size() > maxSize) { + return null; + } + R previousResource = get(); + this.add(resource); + return previousResource; + } + + @Override + public R get() { + if (size() < maxSize) { + return null; + } + nextResource %= size(); + R resource = get(nextResource++); + return resource; + } + + @Override + public Collection values() { + return this; + } + +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java b/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java new file mode 100644 index 0000000..498ab2a --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/SharedMap.java @@ -0,0 +1,240 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * + * The SharedMap maps a key to a collection of values, the elements + * of which are managed by a pool. In effect, that collection acts as a shared + * pool of resources, access to which is closely controlled as per the semantics + * of the pool. + * + *

+ * In case the size of the pool is set to a non-zero positive number, that is + * used to cap the number of resources that a pool may contain for any given + * key. A size of {@link Integer#MAX_VALUE} is interpreted as an unbounded pool. + *

+ * + * @param + * the type of the key to the resource + * @param + * the type of the resource being pooled + * + * @author Karthick Sankarachary + */ +public class SharedMap implements Map { + private PoolType poolType; + + private int poolMaxSize; + + private Map> pools = Collections + .synchronizedMap(new HashMap>()); + + public SharedMap(PoolType poolType, int poolMaxSize) { + this.poolType = poolType; + this.poolMaxSize = poolMaxSize; + } + + @Override + public V get(Object key) { + Pool pool = pools.get(key); + return pool != null ? pool.get() : null; + } + + @Override + public V put(K key, V value) { + Pool pool = pools.get(key); + if (pool == null) { + pools.put(key, pool = createPool()); + } + return pool != null ? pool.put(value) : null; + } + + @Override + public V remove(Object key) { + Pool pool = pools.remove(key); + if (pool != null) { + pool.clear(); + } + return null; + } + + public boolean remove(K key, V value) { + Pool pool = pools.get(key); + return pool != null ? pool.remove(value) : false; + } + + @Override + public Collection values() { + Collection values = new ArrayList(); + for (Pool pool : pools.values()) { + Collection poolValues = pool.values(); + if (poolValues != null) { + values.addAll(poolValues); + } + } + return values; + } + + @Override + public boolean isEmpty() { + return pools.isEmpty(); + } + + @Override + public int size() { + return pools.size(); + } + + public int size(K key) { + Pool pool = pools.get(key); + return pool != null ? pool.size() : 0; + } + + @Override + public boolean containsKey(Object key) { + return pools.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + if (value == null) { + return false; + } + for (Pool pool : pools.values()) { + if (value.equals(pool.get())) { + return true; + } + } + return false; + } + + @Override + public void putAll(Map map) { + for (Map.Entry entry : map.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + + @Override + public void clear() { + for (Pool pool : pools.values()) { + pool.clear(); + } + pools.clear(); + } + + @Override + public Set keySet() { + return pools.keySet(); + } + + @Override + public Set> entrySet() { + Set> entries = new HashSet>(); + for (Map.Entry> poolEntry : pools.entrySet()) { + final K poolKey = poolEntry.getKey(); + final Pool pool = poolEntry.getValue(); + for (final V poolValue : pool.values()) { + if (pool != null) { + entries.add(new Map.Entry() { + @Override + public K getKey() { + return poolKey; + } + + @Override + public V getValue() { + return poolValue; + } + + @Override + public V setValue(V value) { + return pool.put(value); + } + }); + } + } + } + return null; + } + + protected interface Pool { + public R get(); + + public R put(R resource); + + public boolean remove(R resource); + + public void clear(); + + public Collection values(); + + public int size(); + } + + public enum PoolType { + Reusable, ThreadLocal, RoundRobin; + + public static PoolType valueOf(String poolTypeName, + PoolType defaultPoolType, PoolType... allowedPoolTypes) { + PoolType poolType = PoolType.valueOf(poolTypeName); + if (poolType != null) { + boolean allowedType = false; + if (poolType.equals(defaultPoolType)) { + allowedType = true; + } else { + if (allowedPoolTypes != null) { + for (PoolType allowedPoolType : allowedPoolTypes) { + if (poolType.equals(allowedPoolType)) { + allowedType = true; + break; + } + } + } + } + if (!allowedType) { + poolType = null; + } + } + return (poolType != null) ? poolType : defaultPoolType; + } + } + + protected Pool createPool() { + switch (poolType) { + case Reusable: + return new ReusablePool(poolMaxSize); + case RoundRobin: + return new RoundRobinPool(poolMaxSize); + case ThreadLocal: + return new ThreadLocalPool(poolMaxSize); + } + return null; + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalPool.java b/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalPool.java new file mode 100644 index 0000000..07df900 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/util/ThreadLocalPool.java @@ -0,0 +1,112 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.util.SharedMap.Pool; + +/** + * The ThreadLocalPool represents a {@link SharedMap.Pool} that + * builds on the {@link ThreadLocal} class. It essentially binds the resource to + * the thread from which it is accessed. + * + *

+ * If {@link #maxSize} is set to {@link Integer#MAX_VALUE}, then the size of the + * pool is bounded only by the number of threads that add resources to this + * pool. Otherwise, it caps the number of threads that can set a value on this + * {@link ThreadLocal} instance to the (non-zero positive) value specified in + * {@link #maxSize}. + *

+ * + * + * @author Karthick Sankarachary + * + * @param + * the type of the resource + */ +class ThreadLocalPool extends ThreadLocal implements Pool { + private static Map, AtomicInteger> poolSizes = new HashMap, AtomicInteger>(); + + private int maxSize; + + public ThreadLocalPool(int maxSize) { + this.maxSize = maxSize; + } + + @Override + public R put(R resource) { + R previousResource = get(); + if (previousResource == null) { + AtomicInteger poolSize = poolSizes.get(this); + if (poolSize == null) { + poolSizes.put(this, poolSize = new AtomicInteger(0)); + } + if (poolSize.incrementAndGet() > maxSize) { + return null; + } + } + this.set(resource); + return previousResource; + } + + @Override + public void remove() { + super.remove(); + AtomicInteger poolSize = poolSizes.get(this); + if (poolSize != null) { + poolSize.decrementAndGet(); + } + } + + @Override + public int size() { + AtomicInteger poolSize = poolSizes.get(this); + return poolSize != null ? poolSize.get() : 0; + } + + @Override + public boolean remove(R resource) { + R previousResource = super.get(); + if (resource != null && resource.equals(previousResource)) { + remove(); + return true; + } else { + return false; + } + } + + @Override + public void clear() { + super.remove(); + } + + @Override + public Collection values() { + List values = new ArrayList(); + values.add(get()); + return values; + } +} \ No newline at end of file