Index: src/contrib/hbase/src/java/org/onelab/filter/RetouchedBloomFilter.java
===================================================================
--- src/contrib/hbase/src/java/org/onelab/filter/RetouchedBloomFilter.java (revision 599675)
+++ src/contrib/hbase/src/java/org/onelab/filter/RetouchedBloomFilter.java (working copy)
@@ -50,9 +50,10 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Random;
+import java.util.Vector;
/**
* Implements a retouched Bloom filter, as defined in the CoNEXT 2006 paper.
@@ -76,12 +77,12 @@
/**
* KeyList vector (or ElementList Vector, as defined in the paper) of false positives.
*/
- ArrayList[] fpVector;
+ Vector[] fpVector;
/**
* KeyList vector of keys recorded in the filter.
*/
- ArrayList[] keyVector;
+ Vector[] keyVector;
/**
* Ratio vector.
@@ -158,7 +159,7 @@
* Adds a list of false positive information to this retouched Bloom filter.
* @param keys The list of false positive.
*/
- public void addFalsePositive(ArrayList keys){
+ public void addFalsePositive(List keys){
if(keys == null) {
throw new NullPointerException("ArrayList can not be null");
}
@@ -306,8 +307,8 @@
throw new ArrayIndexOutOfBoundsException(index);
}
- ArrayList kl = keyVector[index];
- ArrayList fpl = fpVector[index];
+ List kl = keyVector[index];
+ List fpl = fpVector[index];
// update key list
int listSize = kl.size();
@@ -339,7 +340,7 @@
* @param k The key to remove.
* @param vector The counting vector associated to the key.
*/
- private void removeKey(Key k, ArrayList[] vector) {
+ private void removeKey(Key k, List[] vector) {
if(k == null) {
throw new NullPointerException("Key can not be null");
}
@@ -369,7 +370,7 @@
}//end for - i
}//end computeRatio()
- private double getWeight(ArrayList keyList) {
+ private double getWeight(List keyList) {
double weight = 0.0;
for(Key k: keyList) {
weight += k.getWeight();
@@ -382,13 +383,13 @@
*/
@SuppressWarnings("unchecked")
private void createVector() {
- fpVector = new ArrayList[vectorSize];
- keyVector = new ArrayList[vectorSize];
+ fpVector = new Vector[vectorSize];
+ keyVector = new Vector[vectorSize];
ratio = new double[vectorSize];
for(int i = 0; i < vectorSize; i++) {
- fpVector[i] = new ArrayList();
- keyVector[i] = new ArrayList();
+ fpVector[i] = new Vector();
+ keyVector[i] = new Vector();
ratio[i] = 0.0;
}//end for -i
}//end createVector()
@@ -422,14 +423,14 @@
public void write(DataOutput out) throws IOException {
super.write(out);
for(int i = 0; i < fpVector.length; i++) {
- ArrayList list = fpVector[i];
+ List list = fpVector[i];
out.writeInt(list.size());
for(Key k: list) {
k.write(out);
}
}
for(int i = 0; i < keyVector.length; i++) {
- ArrayList list = keyVector[i];
+ List list = keyVector[i];
out.writeInt(list.size());
for(Key k: list) {
k.write(out);
@@ -446,7 +447,7 @@
super.readFields(in);
createVector();
for(int i = 0; i < fpVector.length; i++) {
- ArrayList list = fpVector[i];
+ List list = fpVector[i];
int size = in.readInt();
for(int j = 0; j < size; j++) {
Key k = new Key();
@@ -455,7 +456,7 @@
}
}
for(int i = 0; i < keyVector.length; i++) {
- ArrayList list = keyVector[i];
+ List list = keyVector[i];
int size = in.readInt();
for(int j = 0; j < size; j++) {
Key k = new Key();
@@ -478,8 +479,8 @@
RetouchedBloomFilter other = (RetouchedBloomFilter)o;
for(int i = 0; result == 0 && i < fpVector.length; i++) {
- ArrayList mylist = fpVector[i];
- ArrayList otherlist = other.fpVector[i];
+ List mylist = fpVector[i];
+ List otherlist = other.fpVector[i];
for(int j = 0; result == 0 && j < mylist.size(); j++) {
result = mylist.get(j).compareTo(otherlist.get(j));
@@ -487,8 +488,8 @@
}
for(int i = 0; result == 0 && i < keyVector.length; i++) {
- ArrayList mylist = keyVector[i];
- ArrayList otherlist = other.keyVector[i];
+ List mylist = keyVector[i];
+ List otherlist = other.keyVector[i];
for(int j = 0; result == 0 && j < mylist.size(); j++) {
result = mylist.get(j).compareTo(otherlist.get(j));
Index: src/contrib/hbase/src/java/org/onelab/filter/Filter.java
===================================================================
--- src/contrib/hbase/src/java/org/onelab/filter/Filter.java (revision 599675)
+++ src/contrib/hbase/src/java/org/onelab/filter/Filter.java (working copy)
@@ -52,8 +52,8 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import org.apache.hadoop.io.WritableComparable;
/**
@@ -146,7 +146,7 @@
* Adds a list of keys to this filter.
* @param keys The list of keys.
*/
- public void add(ArrayList keys){
+ public void add(List keys){
if(keys == null) {
throw new IllegalArgumentException("ArrayList may not be null");
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java (revision 599675)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java (working copy)
@@ -28,6 +28,8 @@
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -113,10 +115,10 @@
private Set closedTables;
// Set of tables currently being located
- private HashSet tablesBeingLocated;
+ private Set tablesBeingLocated;
// Known region HServerAddress.toString() -> HRegionInterface
- private HashMap servers;
+ private Map servers;
/**
* constructor
@@ -145,13 +147,14 @@
this.master = null;
this.masterChecked = false;
- this.tablesToServers = Collections.synchronizedMap(
- new HashMap>());
+ this.tablesToServers =
+ new ConcurrentHashMap>();
this.closedTables = Collections.synchronizedSet(new HashSet());
- this.tablesBeingLocated = new HashSet();
+ this.tablesBeingLocated = Collections.synchronizedSet(
+ new HashSet());
- this.servers = new HashMap();
+ this.servers = new ConcurrentHashMap();
}
/** {@inheritDoc} */
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (revision 599675)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (working copy)
@@ -99,9 +99,11 @@
void snapshot() {
this.lock.writeLock().lock();
try {
- if (memcache.size() != 0) {
- snapshot.putAll(memcache);
- memcache.clear();
+ synchronized (memcache) {
+ if (memcache.size() != 0) {
+ snapshot.putAll(memcache);
+ memcache.clear();
+ }
}
} finally {
this.lock.writeLock().unlock();
@@ -149,9 +151,14 @@
List get(final HStoreKey key, final int numVersions) {
this.lock.readLock().lock();
try {
- ArrayList results = internalGet(memcache, key, numVersions);
- results.addAll(results.size(),
+ List results;
+ synchronized (memcache) {
+ results = internalGet(memcache, key, numVersions);
+ }
+ synchronized (snapshot) {
+ results.addAll(results.size(),
internalGet(snapshot, key, numVersions - results.size()));
+ }
return results;
} finally {
@@ -170,8 +177,12 @@
void getFull(HStoreKey key, SortedMap results) {
this.lock.readLock().lock();
try {
- internalGetFull(memcache, key, results);
- internalGetFull(snapshot, key, results);
+ synchronized (memcache) {
+ internalGetFull(memcache, key, results);
+ }
+ synchronized (snapshot) {
+ internalGetFull(snapshot, key, results);
+ }
} finally {
this.lock.readLock().unlock();
@@ -248,11 +259,15 @@
List getKeys(final HStoreKey origin, final int versions) {
this.lock.readLock().lock();
try {
- List results =
- internalGetKeys(this.memcache, origin, versions);
- results.addAll(results.size(), internalGetKeys(snapshot, origin,
- versions == HConstants.ALL_VERSIONS ? versions :
- (versions - results.size())));
+ List results;
+ synchronized (memcache) {
+ results = internalGetKeys(this.memcache, origin, versions);
+ }
+ synchronized (snapshot) {
+ results.addAll(results.size(), internalGetKeys(snapshot, origin,
+ versions == HConstants.ALL_VERSIONS ? versions :
+ (versions - results.size())));
+ }
return results;
} finally {
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision 599675)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (working copy)
@@ -104,10 +104,12 @@
* @return Region server added.
*/
public RegionServerThread addRegionServer() throws IOException {
- RegionServerThread t = new RegionServerThread(new HRegionServer(conf),
- this.regionThreads.size());
- this.regionThreads.add(t);
- return t;
+ synchronized (regionThreads) {
+ RegionServerThread t = new RegionServerThread(new HRegionServer(conf),
+ this.regionThreads.size());
+ this.regionThreads.add(t);
+ return t;
+ }
}
/** runs region servers */
@@ -146,8 +148,10 @@
* @return Name of region server that just went down.
*/
public String waitOnRegionServer(int serverNumber) {
- RegionServerThread regionServerThread =
- this.regionThreads.remove(serverNumber);
+ RegionServerThread regionServerThread;
+ synchronized (regionThreads) {
+ regionServerThread = this.regionThreads.remove(serverNumber);
+ }
while (regionServerThread.isAlive()) {
try {
LOG.info("Waiting on " +
@@ -193,8 +197,10 @@
*/
public String startup() {
this.master.start();
- for (RegionServerThread t: this.regionThreads) {
- t.start();
+ synchronized (regionThreads) {
+ for (RegionServerThread t: this.regionThreads) {
+ t.start();
+ }
}
return this.master.getMasterAddress().toString();
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java (revision 599675)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java (working copy)
@@ -23,7 +23,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringWriter;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
@@ -105,7 +105,8 @@
}
}
- private HashMap columnMap = new HashMap();
+ private Map columnMap =
+ new ConcurrentHashMap();
public Iterator columnNameIterator() {
return columnMap.keySet().iterator();
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 599675)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy)
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -29,8 +28,8 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.Vector;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -142,13 +141,13 @@
LOG.info("starting merge of regions: " + a.getRegionName() + " and " +
b.getRegionName() + " into new region " + newRegionInfo.toString());
- Map> byFamily =
- new TreeMap>();
+ Map> byFamily =
+ new TreeMap>();
byFamily = filesByFamily(byFamily, a.close());
byFamily = filesByFamily(byFamily, b.close());
- for (Map.Entry> es : byFamily.entrySet()) {
+ for (Map.Entry> es : byFamily.entrySet()) {
Text colFamily = es.getKey();
- Vector srcFiles = es.getValue();
+ List srcFiles = es.getValue();
HStoreFile dst = new HStoreFile(conf, merges,
HRegionInfo.encodeRegionName(newRegionInfo.getRegionName()),
colFamily, Math.abs(rand.nextLong()));
@@ -175,12 +174,12 @@
* @param storeFiles Store files to process.
* @return Returns byFamily
*/
- private static Map> filesByFamily(
- Map> byFamily, Vector storeFiles) {
+ private static Map> filesByFamily(
+ Map> byFamily, List storeFiles) {
for(HStoreFile src: storeFiles) {
- Vector v = byFamily.get(src.getColFamily());
+ List v = byFamily.get(src.getColFamily());
if(v == null) {
- v = new Vector();
+ v = new ArrayList();
byFamily.put(src.getColFamily(), v);
}
v.add(src);
@@ -192,11 +191,11 @@
// Members
//////////////////////////////////////////////////////////////////////////////
- volatile Map rowsToLocks = new HashMap();
- volatile Map locksToRows = new HashMap();
- volatile Map stores = new HashMap();
+ volatile Map rowsToLocks = new ConcurrentHashMap();
+ volatile Map locksToRows = new ConcurrentHashMap();
+ volatile Map stores = new ConcurrentHashMap();
volatile Map> targetColumns =
- new HashMap>();
+ new ConcurrentHashMap>();
final AtomicLong memcacheSize = new AtomicLong(0);
@@ -359,7 +358,7 @@
*
* @throws IOException
*/
- public Vector close() throws IOException {
+ public List close() throws IOException {
return close(false);
}
@@ -377,7 +376,7 @@
*
* @throws IOException
*/
- Vector close(boolean abort) throws IOException {
+ List close(boolean abort) throws IOException {
if (isClosed()) {
LOG.info("region " + this.regionInfo.getRegionName() + " already closed");
return null;
@@ -421,7 +420,7 @@
internalFlushcache(snapshotMemcaches());
}
- Vector result = new Vector();
+ List result = new ArrayList();
for (HStore store: stores.values()) {
result.addAll(store.close());
}
@@ -571,7 +570,7 @@
// Now close the HRegion. Close returns all store files or null if not
// supposed to close (? What to do in this case? Implement abort of close?)
// Close also does wait on outstanding rows and calls a flush just-in-case.
- Vector hstoreFilesToSplit = close();
+ List hstoreFilesToSplit = close();
if (hstoreFilesToSplit == null) {
LOG.warn("Close came back null (Implement abort of close?)");
throw new RuntimeException("close returned empty vector of HStoreFiles");
@@ -909,6 +908,7 @@
// A. Flush memcache to all the HStores.
// Keep running vector of all store files that includes both old and the
// just-made new flush store file.
+
for (HStore hstore: stores.values()) {
hstore.flushCache(sequenceId);
}