snapshot = null;
- HLocking locking = new HLocking();
+ ReadWriteLock locker = new ReentrantReadWriteLock();
public HMemcache() {
}
@@ -52,13 +54,17 @@
}
/**
- * We want to return a snapshot of the current HMemcache with a known HLog
+ * Returns a snapshot of the current HMemcache with a known HLog
* sequence number at the same time.
+ *
+ * We need to prevent any writing to the cache during this time,
+ * so we obtain a write lock for the duration of the operation.
*
- * Return both the frozen HMemcache TreeMap, as well as the HLog seq number.
- *
- * We need to prevent any writing to the cache during this time, so we obtain
- * a write lock for the duration of the operation.
+ * If this method returns non-null, client must call
+ * {@link #deleteSnapshot()} to clear 'snapshot-in-progress'
+ * state when finished with the returned {@link Snapshot}.
+ *
+ * @return frozen HMemcache TreeMap and HLog sequence number.
*/
public Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
Snapshot retval = new Snapshot();
@@ -63,12 +69,12 @@
public Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
Snapshot retval = new Snapshot();
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
- if (snapshot != null) {
+ if(snapshot != null) {
throw new IOException("Snapshot in progress!");
}
- if (memcache.size() == 0) {
+ if(memcache.size() == 0) {
LOG.debug("memcache empty. Skipping snapshot");
return retval;
}
@@ -86,7 +92,7 @@
return retval;
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
@@ -96,10 +102,10 @@
* Modifying the structure means we need to obtain a writelock.
*/
public void deleteSnapshot() throws IOException {
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
- if (snapshot == null) {
+ if(snapshot == null) {
throw new IOException("Snapshot not present!");
}
LOG.debug("deleting snapshot");
@@ -105,10 +111,10 @@
LOG.debug("deleting snapshot");
for(Iterator> it = history.iterator();
- it.hasNext();) {
+ it.hasNext(); ) {
TreeMap cur = it.next();
- if (snapshot == cur) {
+ if(snapshot == cur) {
it.remove();
break;
}
@@ -118,7 +124,7 @@
LOG.debug("snapshot deleted");
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
@@ -128,9 +134,9 @@
* Operation uses a write lock.
*/
public void add(Text row, TreeMap columns, long timestamp) {
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
- for(Iterator it = columns.keySet().iterator(); it.hasNext();) {
+ for(Iterator it = columns.keySet().iterator(); it.hasNext(); ) {
Text column = it.next();
byte[] val = columns.get(column);
@@ -139,7 +145,7 @@
}
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
@@ -150,7 +156,7 @@
*/
public byte[][] get(HStoreKey key, int numVersions) {
Vector results = new Vector();
- locking.obtainReadLock();
+ this.locker.readLock().lock();
try {
Vector result = get(memcache, key, numVersions-results.size());
results.addAll(0, result);
@@ -156,7 +162,7 @@
results.addAll(0, result);
for(int i = history.size()-1; i >= 0; i--) {
- if (numVersions > 0 && results.size() >= numVersions) {
+ if(numVersions > 0 && results.size() >= numVersions) {
break;
}
@@ -164,7 +170,7 @@
results.addAll(results.size(), result);
}
- if (results.size() == 0) {
+ if(results.size() == 0) {
return null;
} else {
@@ -172,7 +178,7 @@
}
} finally {
- locking.releaseReadLock();
+ this.locker.readLock().unlock();
}
}
@@ -184,7 +190,7 @@
*/
public TreeMap getFull(HStoreKey key) throws IOException {
TreeMap results = new TreeMap();
- locking.obtainReadLock();
+ this.locker.readLock().lock();
try {
internalGetFull(memcache, key, results);
for(int i = history.size()-1; i >= 0; i--) {
@@ -194,7 +200,7 @@
return results;
} finally {
- locking.releaseReadLock();
+ this.locker.readLock().unlock();
}
}
@@ -199,15 +205,15 @@
}
void internalGetFull(TreeMap map, HStoreKey key,
- TreeMap results) {
+ TreeMap results) {
SortedMap tailMap = map.tailMap(key);
- for(Iterator it = tailMap.keySet().iterator(); it.hasNext();) {
+ for(Iterator it = tailMap.keySet().iterator(); it.hasNext(); ) {
HStoreKey itKey = it.next();
Text itCol = itKey.getColumn();
- if (results.get(itCol) == null
+ if(results.get(itCol) == null
&& key.matchesWithoutColumn(itKey)) {
BytesWritable val = tailMap.get(itKey);
results.put(itCol, val.get());
@@ -212,7 +218,7 @@
BytesWritable val = tailMap.get(itKey);
results.put(itCol, val.get());
- } else if (key.getRow().compareTo(itKey.getRow()) > 0) {
+ } else if(key.getRow().compareTo(itKey.getRow()) > 0) {
break;
}
}
@@ -232,10 +238,10 @@
HStoreKey curKey = new HStoreKey(key.getRow(), key.getColumn(), key.getTimestamp());
SortedMap tailMap = map.tailMap(curKey);
- for(Iterator it = tailMap.keySet().iterator(); it.hasNext();) {
+ for(Iterator it = tailMap.keySet().iterator(); it.hasNext(); ) {
HStoreKey itKey = it.next();
- if (itKey.matchesRowCol(curKey)) {
+ if(itKey.matchesRowCol(curKey)) {
result.add(tailMap.get(itKey).get());
curKey.setVersion(itKey.getTimestamp() - 1);
}
@@ -240,7 +246,7 @@
curKey.setVersion(itKey.getTimestamp() - 1);
}
- if (numVersions > 0 && result.size() >= numVersions) {
+ if(numVersions > 0 && result.size() >= numVersions) {
break;
}
}
@@ -251,7 +257,7 @@
* Return a scanner over the keys in the HMemcache
*/
public HScannerInterface getScanner(long timestamp, Text targetCols[], Text firstRow)
- throws IOException {
+ throws IOException {
return new HMemcacheScanner(timestamp, targetCols, firstRow);
}
@@ -267,11 +273,11 @@
@SuppressWarnings("unchecked")
public HMemcacheScanner(long timestamp, Text targetCols[], Text firstRow)
- throws IOException {
+ throws IOException {
super(timestamp, targetCols);
- locking.obtainReadLock();
+ locker.readLock().lock();
try {
this.backingMaps = new TreeMap[history.size() + 1];
int i = 0;
@@ -276,7 +282,7 @@
this.backingMaps = new TreeMap[history.size() + 1];
int i = 0;
for(Iterator> it = history.iterator();
- it.hasNext();) {
+ it.hasNext(); ) {
backingMaps[i++] = it.next();
}
@@ -290,7 +296,7 @@
HStoreKey firstKey = new HStoreKey(firstRow);
for(i = 0; i < backingMaps.length; i++) {
- if (firstRow.getLength() != 0) {
+ if(firstRow.getLength() != 0) {
keyIterators[i] = backingMaps[i].tailMap(firstKey).keySet().iterator();
} else {
@@ -298,10 +304,10 @@
}
while(getNext(i)) {
- if (!findFirstRow(i, firstRow)) {
+ if(! findFirstRow(i, firstRow)) {
continue;
}
- if (columnMatch(i)) {
+ if(columnMatch(i)) {
break;
}
}
@@ -331,7 +337,7 @@
* @return - true if there is more data available
*/
boolean getNext(int i) {
- if (!keyIterators[i].hasNext()) {
+ if(! keyIterators[i].hasNext()) {
closeSubScanner(i);
return false;
}
@@ -350,10 +356,10 @@
/** Shut down map iterators, and release the lock */
public void close() throws IOException {
- if (!scannerClosed) {
+ if(! scannerClosed) {
try {
for(int i = 0; i < keys.length; i++) {
- if (keyIterators[i] != null) {
+ if(keyIterators[i] != null) {
closeSubScanner(i);
}
}
@@ -359,7 +365,7 @@
}
} finally {
- locking.releaseReadLock();
+ locker.readLock().unlock();
scannerClosed = true;
}
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (revision 530954)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (working copy)
@@ -67,13 +67,13 @@
// Writable
//////////////////////////////////////////////////////////////////////////////
- public void write(DataOutput out) throws IOException {
- out.writeByte(msg);
- info.write(out);
- }
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(msg);
+ info.write(out);
+ }
- public void readFields(DataInput in) throws IOException {
- this.msg = in.readByte();
- this.info.readFields(in);
- }
+ public void readFields(DataInput in) throws IOException {
+ this.msg = in.readByte();
+ this.info.readFields(in);
+ }
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (revision 530954)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (working copy)
@@ -61,8 +61,8 @@
// Make sure that srcA comes first; important for key-ordering during
// write of the merged file.
- if (srcA.getStartKey() == null) {
- if (srcB.getStartKey() == null) {
+ if(srcA.getStartKey() == null) {
+ if(srcB.getStartKey() == null) {
throw new IOException("Cannot merge two regions with null start key");
}
// A's start key is null but B's isn't. Assume A comes before B
@@ -67,8 +67,8 @@
}
// A's start key is null but B's isn't. Assume A comes before B
- } else if ((srcB.getStartKey() == null) // A is not null but B is
- || (srcA.getStartKey().compareTo(srcB.getStartKey()) > 0)) { // A > B
+ } else if((srcB.getStartKey() == null) // A is not null but B is
+ || (srcA.getStartKey().compareTo(srcB.getStartKey()) > 0)) { // A > B
HRegion tmp = srcA;
srcA = srcB;
@@ -75,7 +75,7 @@
srcB = tmp;
}
- if (!srcA.getEndKey().equals(srcB.getStartKey())) {
+ if (! srcA.getEndKey().equals(srcB.getStartKey())) {
throw new IOException("Cannot merge non-adjacent regions");
}
@@ -89,7 +89,7 @@
Text endKey = srcB.getEndKey();
Path merges = new Path(srcA.getRegionDir(), MERGEDIR);
- if (!fs.exists(merges)) {
+ if(! fs.exists(merges)) {
fs.mkdirs(merges);
}
@@ -98,7 +98,7 @@
Path newRegionDir = HStoreFile.getHRegionDir(merges, newRegionInfo.regionName);
- if (fs.exists(newRegionDir)) {
+ if(fs.exists(newRegionDir)) {
throw new IOException("Cannot merge; target file collision at " + newRegionDir);
}
@@ -103,9 +103,9 @@
}
LOG.info("starting merge of regions: " + srcA.getRegionName() + " and "
- + srcB.getRegionName() + " new region start key is '"
- + (startKey == null ? "" : startKey) + "', end key is '"
- + (endKey == null ? "" : endKey) + "'");
+ + srcB.getRegionName() + " new region start key is '"
+ + (startKey == null ? "" : startKey) + "', end key is '"
+ + (endKey == null ? "" : endKey) + "'");
// Flush each of the sources, and merge their files into a single
// target for each column family.
@@ -114,10 +114,10 @@
TreeSet alreadyMerged = new TreeSet();
TreeMap> filesToMerge = new TreeMap>();
- for(Iterator it = srcA.flushcache(true).iterator(); it.hasNext();) {
+ for(Iterator it = srcA.flushcache(true).iterator(); it.hasNext(); ) {
HStoreFile src = it.next();
Vector v = filesToMerge.get(src.getColFamily());
- if (v == null) {
+ if(v == null) {
v = new Vector();
filesToMerge.put(src.getColFamily(), v);
}
@@ -126,10 +126,10 @@
LOG.debug("flushing and getting file names for region " + srcB.getRegionName());
- for(Iterator it = srcB.flushcache(true).iterator(); it.hasNext();) {
+ for(Iterator it = srcB.flushcache(true).iterator(); it.hasNext(); ) {
HStoreFile src = it.next();
Vector v = filesToMerge.get(src.getColFamily());
- if (v == null) {
+ if(v == null) {
v = new Vector();
filesToMerge.put(src.getColFamily(), v);
}
@@ -138,11 +138,11 @@
LOG.debug("merging stores");
- for(Iterator it = filesToMerge.keySet().iterator(); it.hasNext();) {
+ for(Iterator it = filesToMerge.keySet().iterator(); it.hasNext(); ) {
Text colFamily = it.next();
Vector srcFiles = filesToMerge.get(colFamily);
HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName,
- colFamily, Math.abs(rand.nextLong()));
+ colFamily, Math.abs(rand.nextLong()));
dst.mergeStoreFiles(srcFiles, fs, conf);
alreadyMerged.addAll(srcFiles);
@@ -153,15 +153,15 @@
// of any last-minute inserts
LOG.debug("flushing changes since start of merge for region "
- + srcA.getRegionName());
+ + srcA.getRegionName());
filesToMerge.clear();
- for(Iterator it = srcA.close().iterator(); it.hasNext();) {
+ for(Iterator it = srcA.close().iterator(); it.hasNext(); ) {
HStoreFile src = it.next();
- if (!alreadyMerged.contains(src)) {
+ if(! alreadyMerged.contains(src)) {
Vector v = filesToMerge.get(src.getColFamily());
- if (v == null) {
+ if(v == null) {
v = new Vector();
filesToMerge.put(src.getColFamily(), v);
}
@@ -170,14 +170,14 @@
}
LOG.debug("flushing changes since start of merge for region "
- + srcB.getRegionName());
+ + srcB.getRegionName());
- for(Iterator it = srcB.close().iterator(); it.hasNext();) {
+ for(Iterator it = srcB.close().iterator(); it.hasNext(); ) {
HStoreFile src = it.next();
- if (!alreadyMerged.contains(src)) {
+ if(! alreadyMerged.contains(src)) {
Vector v = filesToMerge.get(src.getColFamily());
- if (v == null) {
+ if(v == null) {
v = new Vector();
filesToMerge.put(src.getColFamily(), v);
}
@@ -187,11 +187,11 @@
LOG.debug("merging changes since start of merge");
- for(Iterator it = filesToMerge.keySet().iterator(); it.hasNext();) {
+ for(Iterator it = filesToMerge.keySet().iterator(); it.hasNext(); ) {
Text colFamily = it.next();
Vector srcFiles = filesToMerge.get(colFamily);
HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName,
- colFamily, Math.abs(rand.nextLong()));
+ colFamily, Math.abs(rand.nextLong()));
dst.mergeStoreFiles(srcFiles, fs, conf);
}
@@ -199,7 +199,7 @@
// Done
HRegion dstRegion = new HRegion(dir, log, fs, conf, newRegionInfo,
- newRegionDir, null);
+ newRegionDir, null);
// Get rid of merges directory
@@ -284,7 +284,7 @@
* written-to before), then read it from the supplied path.
*/
public HRegion(Path dir, HLog log, FileSystem fs, Configuration conf,
- HRegionInfo regionInfo, Path initialFiles, Path oldLogFile) throws IOException {
+ HRegionInfo regionInfo, Path initialFiles, Path oldLogFile) throws IOException {
this.dir = dir;
this.log = log;
@@ -303,7 +303,7 @@
// Move prefab HStore files into place (if any)
- if (initialFiles != null && fs.exists(initialFiles)) {
+ if(initialFiles != null && fs.exists(initialFiles)) {
fs.rename(initialFiles, regiondir);
}
@@ -310,11 +310,11 @@
// Load in all the HStores.
for(Iterator it = this.regionInfo.tableDesc.families().iterator();
- it.hasNext();) {
+ it.hasNext(); ) {
- Text colFamily = it.next();
+ Text colFamily = HStoreKey.extractFamily(it.next());
stores.put(colFamily, new HStore(dir, this.regionInfo.regionName, colFamily,
- this.regionInfo.tableDesc.getMaxVersions(), fs, oldLogFile, conf));
+ this.regionInfo.tableDesc.getMaxVersions(), fs, oldLogFile, conf));
}
// Get rid of any splits or merges that were lost in-progress
@@ -320,7 +320,7 @@
// Get rid of any splits or merges that were lost in-progress
Path splits = new Path(regiondir, SPLITDIR);
- if (fs.exists(splits)) {
+ if(fs.exists(splits)) {
fs.delete(splits);
}
@@ -325,7 +325,7 @@
}
Path merges = new Path(regiondir, MERGEDIR);
- if (fs.exists(merges)) {
+ if(fs.exists(merges)) {
fs.delete(merges);
}
@@ -345,6 +345,7 @@
/** Closes and deletes this HRegion. Called when doing a table deletion, for example */
public void closeAndDelete() throws IOException {
+ LOG.info("deleting region: " + regionInfo.regionName);
close();
fs.delete(regiondir);
}
@@ -362,7 +363,7 @@
public Vector close() throws IOException {
boolean shouldClose = false;
synchronized(writestate) {
- if (writestate.closed) {
+ if(writestate.closed) {
LOG.info("region " + this.regionInfo.regionName + " closed");
return new Vector();
}
@@ -376,7 +377,7 @@
shouldClose = true;
}
- if (!shouldClose) {
+ if(! shouldClose) {
return null;
} else {
@@ -382,7 +383,7 @@
} else {
LOG.info("closing region " + this.regionInfo.regionName);
Vector allHStoreFiles = internalFlushcache();
- for(Iterator it = stores.values().iterator(); it.hasNext();) {
+ for(Iterator it = stores.values().iterator(); it.hasNext(); ) {
HStore store = it.next();
store.close();
}
@@ -406,8 +407,8 @@
* Returns two brand-new (and open) HRegions
*/
public HRegion[] closeAndSplit(Text midKey) throws IOException {
- if (((regionInfo.startKey.getLength() != 0)
- && (regionInfo.startKey.compareTo(midKey) > 0))
+ if(((regionInfo.startKey.getLength() != 0)
+ && (regionInfo.startKey.compareTo(midKey) > 0))
|| ((regionInfo.endKey.getLength() != 0)
&& (regionInfo.endKey.compareTo(midKey) < 0))) {
throw new IOException("Region splitkey must lie within region boundaries.");
@@ -419,7 +420,7 @@
// or compactions until close() is called.
Path splits = new Path(regiondir, SPLITDIR);
- if (!fs.exists(splits)) {
+ if(! fs.exists(splits)) {
fs.mkdirs(splits);
}
@@ -425,7 +426,7 @@
long regionAId = Math.abs(rand.nextLong());
HRegionInfo regionAInfo = new HRegionInfo(regionAId, regionInfo.tableDesc,
- regionInfo.startKey, midKey);
+ regionInfo.startKey, midKey);
long regionBId = Math.abs(rand.nextLong());
HRegionInfo regionBInfo
@@ -434,9 +435,9 @@
Path dirA = HStoreFile.getHRegionDir(splits, regionAInfo.regionName);
Path dirB = HStoreFile.getHRegionDir(splits, regionBInfo.regionName);
- if (fs.exists(dirA) || fs.exists(dirB)) {
+ if(fs.exists(dirA) || fs.exists(dirB)) {
throw new IOException("Cannot split; target file collision at " + dirA
- + " or " + dirB);
+ + " or " + dirB);
}
TreeSet alreadySplit = new TreeSet();
@@ -441,17 +442,17 @@
TreeSet alreadySplit = new TreeSet();
Vector hstoreFilesToSplit = flushcache(true);
- for(Iterator it = hstoreFilesToSplit.iterator(); it.hasNext();) {
+ for(Iterator it = hstoreFilesToSplit.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
LOG.debug("splitting HStore " + hsf.getRegionName() + "/" + hsf.getColFamily()
- + "/" + hsf.fileId());
+ + "/" + hsf.fileId());
HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName,
- hsf.getColFamily(), Math.abs(rand.nextLong()));
+ hsf.getColFamily(), Math.abs(rand.nextLong()));
HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName,
- hsf.getColFamily(), Math.abs(rand.nextLong()));
+ hsf.getColFamily(), Math.abs(rand.nextLong()));
hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
alreadySplit.add(hsf);
@@ -461,18 +462,18 @@
// and copy the small remainder
hstoreFilesToSplit = close();
- for(Iterator it = hstoreFilesToSplit.iterator(); it.hasNext();) {
+ for(Iterator it = hstoreFilesToSplit.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
- if (!alreadySplit.contains(hsf)) {
+ if(! alreadySplit.contains(hsf)) {
LOG.debug("splitting HStore " + hsf.getRegionName() + "/" + hsf.getColFamily()
- + "/" + hsf.fileId());
+ + "/" + hsf.fileId());
HStoreFile dstA = new HStoreFile(conf, splits, regionAInfo.regionName,
- hsf.getColFamily(), Math.abs(rand.nextLong()));
+ hsf.getColFamily(), Math.abs(rand.nextLong()));
HStoreFile dstB = new HStoreFile(conf, splits, regionBInfo.regionName,
- hsf.getColFamily(), Math.abs(rand.nextLong()));
+ hsf.getColFamily(), Math.abs(rand.nextLong()));
hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
}
@@ -494,7 +495,7 @@
regions[1] = regionB;
LOG.info("region split complete. new regions are: " + regions[0].getRegionName()
- + ", " + regions[1].getRegionName());
+ + ", " + regions[1].getRegionName());
return regions;
}
@@ -565,10 +566,10 @@
Text key = new Text();
long maxSize = 0;
- for(Iterator i = stores.values().iterator(); i.hasNext();) {
+ for(Iterator i = stores.values().iterator(); i.hasNext(); ) {
long size = i.next().getLargestFileSize(key);
- if (size > maxSize) { // Largest so far
+ if(size > maxSize) { // Largest so far
maxSize = size;
midKey.set(key);
}
@@ -593,9 +594,9 @@
public boolean compactStores() throws IOException {
boolean shouldCompact = false;
synchronized(writestate) {
- if ((!writestate.writesOngoing)
+ if((! writestate.writesOngoing)
&& writestate.writesEnabled
- && (!writestate.closed)
+ && (! writestate.closed)
&& recentCommits > MIN_COMMITS_FOR_COMPACTION) {
writestate.writesOngoing = true;
@@ -603,7 +604,7 @@
}
}
- if (!shouldCompact) {
+ if(! shouldCompact) {
LOG.info("not compacting region " + this.regionInfo.regionName);
return false;
@@ -610,7 +611,7 @@
} else {
try {
LOG.info("starting compaction on region " + this.regionInfo.regionName);
- for(Iterator it = stores.values().iterator(); it.hasNext();) {
+ for(Iterator it = stores.values().iterator(); it.hasNext(); ) {
HStore store = it.next();
store.compact();
}
@@ -632,7 +633,7 @@
* only take if there have been a lot of uncommitted writes.
*/
public void optionallyFlush() throws IOException {
- if (commitsSinceFlush > maxUnflushedEntries) {
+ if(commitsSinceFlush > maxUnflushedEntries) {
flushcache(false);
}
}
@@ -657,9 +658,9 @@
public Vector flushcache(boolean disableFutureWrites) throws IOException {
boolean shouldFlush = false;
synchronized(writestate) {
- if ((!writestate.writesOngoing)
+ if((! writestate.writesOngoing)
&& writestate.writesEnabled
- && (!writestate.closed)) {
+ && (! writestate.closed)) {
writestate.writesOngoing = true;
shouldFlush = true;
@@ -664,7 +665,7 @@
writestate.writesOngoing = true;
shouldFlush = true;
- if (disableFutureWrites) {
+ if(disableFutureWrites) {
writestate.writesEnabled = false;
}
}
@@ -670,7 +671,7 @@
}
}
- if (!shouldFlush) {
+ if(! shouldFlush) {
LOG.debug("not flushing cache for region " + this.regionInfo.regionName);
return null;
@@ -731,8 +732,8 @@
HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
TreeMap memcacheSnapshot = retval.memcacheSnapshot;
- if (memcacheSnapshot == null) {
- for(Iterator it = stores.values().iterator(); it.hasNext();) {
+ if(memcacheSnapshot == null) {
+ for(Iterator it = stores.values().iterator(); it.hasNext(); ) {
HStore hstore = it.next();
Vector hstoreFiles = hstore.getAllMapFiles();
allHStoreFiles.addAll(0, hstoreFiles);
@@ -746,7 +747,7 @@
LOG.debug("flushing memcache to HStores");
- for(Iterator it = stores.values().iterator(); it.hasNext();) {
+ for(Iterator it = stores.values().iterator(); it.hasNext(); ) {
HStore hstore = it.next();
Vector hstoreFiles
= hstore.flushCache(memcacheSnapshot, logCacheFlushId);
@@ -762,7 +763,7 @@
LOG.debug("writing flush cache complete to log");
log.completeCacheFlush(this.regionInfo.regionName,
- regionInfo.tableDesc.getName(), logCacheFlushId);
+ regionInfo.tableDesc.getName(), logCacheFlushId);
// C. Delete the now-irrelevant memcache snapshot; its contents have been
// dumped to disk-based HStores.
@@ -784,7 +785,7 @@
/** Fetch a single data item. */
public byte[] get(Text row, Text column) throws IOException {
byte results[][] = get(row, column, Long.MAX_VALUE, 1);
- if (results == null) {
+ if(results == null) {
return null;
} else {
@@ -799,9 +800,9 @@
/** Fetch multiple versions of a single data item, with timestamp. */
public byte[][] get(Text row, Text column, long timestamp, int numVersions)
- throws IOException {
+ throws IOException {
- if (writestate.closed) {
+ if(writestate.closed) {
throw new IOException("HRegion is closed.");
}
@@ -808,8 +809,7 @@
// Make sure this is a valid row and valid column
checkRow(row);
- Text colFamily = HStoreKey.extractFamily(column);
- checkFamily(colFamily);
+ checkColumn(column);
// Obtain the row-lock
@@ -830,7 +830,7 @@
// Check the memcache
byte[][] result = memcache.get(key, numVersions);
- if (result != null) {
+ if(result != null) {
return result;
}
@@ -838,7 +838,7 @@
Text colFamily = HStoreKey.extractFamily(key.getColumn());
HStore targetStore = stores.get(colFamily);
- if (targetStore == null) {
+ if(targetStore == null) {
return null;
}
@@ -859,7 +859,7 @@
HStoreKey key = new HStoreKey(row, System.currentTimeMillis());
TreeMap memResult = memcache.getFull(key);
- for(Iterator it = stores.keySet().iterator(); it.hasNext();) {
+ for(Iterator it = stores.keySet().iterator(); it.hasNext(); ) {
Text colFamily = it.next();
HStore targetStore = stores.get(colFamily);
targetStore.getFull(key, memResult);
@@ -879,7 +879,7 @@
HStore storelist[] = new HStore[families.size()];
int i = 0;
- for(Iterator it = families.iterator(); it.hasNext();) {
+ for(Iterator it = families.iterator(); it.hasNext(); ) {
Text family = it.next();
storelist[i++] = stores.get(family);
}
@@ -911,7 +911,7 @@
/**
* Put a cell value into the locked row. The user indicates the row-lock, the
* target column, and the desired value. This stuff is set into a temporary
- * memory area until the user commits the change, at which pointit's logged
+ * memory area until the user commits the change, at which point it's logged
* and placed into the memcache.
*
* This method really just tests the input, then calls an internal localput()
@@ -918,10 +918,10 @@
* method.
*/
public void put(long lockid, Text targetCol, byte[] val) throws IOException {
- if (val.length == HStoreKey.DELETE_BYTES.length) {
+ if(val.length == HStoreKey.DELETE_BYTES.length) {
boolean matches = true;
for(int i = 0; i < val.length; i++) {
- if (val[i] != HStoreKey.DELETE_BYTES[i]) {
+ if(val[i] != HStoreKey.DELETE_BYTES[i]) {
matches = false;
break;
}
@@ -927,7 +927,7 @@
}
}
- if (matches) {
+ if(matches) {
throw new IOException("Cannot insert value: " + val);
}
}
@@ -950,9 +950,11 @@
* (Or until the user's write-lock expires.)
*/
void localput(long lockid, Text targetCol, byte[] val) throws IOException {
+ checkColumn(targetCol);
+
Text row = getRowFromLock(lockid);
- if (row == null) {
- throw new IOException("No write lock for lockid " + lockid);
+ if(row == null) {
+ throw new LockException("No write lock for lockid " + lockid);
}
// This sync block makes localput() thread-safe when multiple
@@ -964,13 +966,13 @@
// This check makes sure that another thread from the client
// hasn't aborted/committed the write-operation.
- if (row != getRowFromLock(lockid)) {
- throw new IOException("Locking error: put operation on lock " + lockid
- + " unexpected aborted by another thread");
+ if(row != getRowFromLock(lockid)) {
+ throw new LockException("Locking error: put operation on lock " + lockid
+ + " unexpected aborted by another thread");
}
TreeMap targets = targetColumns.get(lockid);
- if (targets == null) {
+ if(targets == null) {
targets = new TreeMap();
targetColumns.put(lockid, targets);
}
@@ -985,8 +987,8 @@
*/
public void abort(long lockid) throws IOException {
Text row = getRowFromLock(lockid);
- if (row == null) {
- throw new IOException("No write lock for lockid " + lockid);
+ if(row == null) {
+ throw new LockException("No write lock for lockid " + lockid);
}
// This sync block makes abort() thread-safe when multiple
@@ -998,9 +1000,9 @@
// This check makes sure another thread from the client
// hasn't aborted/committed the write-operation.
- if (row != getRowFromLock(lockid)) {
- throw new IOException("Locking error: abort() operation on lock "
- + lockid + " unexpected aborted by another thread");
+ if(row != getRowFromLock(lockid)) {
+ throw new LockException("Locking error: abort() operation on lock "
+ + lockid + " unexpected aborted by another thread");
}
targetColumns.remove(lockid);
@@ -1021,8 +1023,8 @@
// that repeated executions won't screw this up.
Text row = getRowFromLock(lockid);
- if (row == null) {
- throw new IOException("No write lock for lockid " + lockid);
+ if(row == null) {
+ throw new LockException("No write lock for lockid " + lockid);
}
// This check makes sure that another thread from the client
@@ -1035,7 +1037,7 @@
long commitTimestamp = System.currentTimeMillis();
log.append(regionInfo.regionName, regionInfo.tableDesc.getName(), row,
- targetColumns.get(lockid), commitTimestamp);
+ targetColumns.get(lockid), commitTimestamp);
memcache.add(row, targetColumns.get(lockid), commitTimestamp);
@@ -1054,8 +1056,8 @@
/** Make sure this is a valid row for the HRegion */
void checkRow(Text row) throws IOException {
- if (((regionInfo.startKey.getLength() == 0)
- || (regionInfo.startKey.compareTo(row) <= 0))
+ if(((regionInfo.startKey.getLength() == 0)
+ || (regionInfo.startKey.compareTo(row) <= 0))
&& ((regionInfo.endKey.getLength() == 0)
|| (regionInfo.endKey.compareTo(row) > 0))) {
// all's well
@@ -1062,17 +1064,18 @@
} else {
throw new IOException("Requested row out of range for HRegion "
- + regionInfo.regionName + ", startKey='" + regionInfo.startKey
- + "', endKey='" + regionInfo.endKey + "', row='" + row + "'");
+ + regionInfo.regionName + ", startKey='" + regionInfo.startKey
+ + "', endKey='" + regionInfo.endKey + "', row='" + row + "'");
}
}
-
+
/** Make sure this is a valid column for the current table */
- void checkFamily(Text family) throws IOException {
- if (!regionInfo.tableDesc.hasFamily(family)) {
+ void checkColumn(Text columnName) throws IOException {
+ Text family = new Text(HStoreKey.extractFamily(columnName) + ":");
+ if(! regionInfo.tableDesc.hasFamily(family)) {
throw new IOException("Requested column family " + family
- + " does not exist in HRegion " + regionInfo.regionName
- + " for table " + regionInfo.tableDesc.getName());
+ + " does not exist in HRegion " + regionInfo.regionName
+ + " for table " + regionInfo.tableDesc.getName());
}
}
@@ -1092,6 +1095,8 @@
* which maybe we'll do in the future.
*/
long obtainLock(Text row) throws IOException {
+ checkRow(row);
+
synchronized(rowsToLocks) {
while(rowsToLocks.get(row) != null) {
try {
@@ -1109,6 +1114,8 @@
}
Text getRowFromLock(long lockid) throws IOException {
+ // Pattern is that all access to rowsToLocks and/or to
+ // locksToRows is via a lock on rowsToLocks.
synchronized(rowsToLocks) {
return locksToRows.get(lockid);
}
@@ -1150,7 +1157,7 @@
keys[i] = new HStoreKey();
resultSets[i] = new TreeMap();
- if (!scanners[i].next(keys[i], resultSets[i])) {
+ if(! scanners[i].next(keys[i], resultSets[i])) {
closeScanner(i);
}
}
@@ -1167,7 +1174,7 @@
Text chosenRow = null;
long chosenTimestamp = -1;
for(int i = 0; i < keys.length; i++) {
- if (scanners[i] != null
+ if(scanners[i] != null
&& (chosenRow == null
|| (keys[i].getRow().compareTo(chosenRow) < 0)
|| ((keys[i].getRow().compareTo(chosenRow) == 0)
@@ -1181,7 +1188,7 @@
// Store the key and results for each sub-scanner. Merge them as appropriate.
boolean insertedItem = false;
- if (chosenTimestamp > 0) {
+ if(chosenTimestamp > 0) {
key.setRow(chosenRow);
key.setVersion(chosenTimestamp);
key.setColumn(new Text(""));
@@ -1188,8 +1195,8 @@
for(int i = 0; i < scanners.length; i++) {
while((scanners[i] != null)
- && (keys[i].getRow().compareTo(chosenRow) == 0)
- && (keys[i].getTimestamp() == chosenTimestamp)) {
+ && (keys[i].getRow().compareTo(chosenRow) == 0)
+ && (keys[i].getTimestamp() == chosenTimestamp)) {
results.putAll(resultSets[i]);
insertedItem = true;
@@ -1195,7 +1202,7 @@
insertedItem = true;
resultSets[i].clear();
- if (!scanners[i].next(keys[i], resultSets[i])) {
+ if(! scanners[i].next(keys[i], resultSets[i])) {
closeScanner(i);
}
}
@@ -1204,10 +1211,10 @@
// row label, then its timestamp is bad. We need to advance it.
while((scanners[i] != null)
- && (keys[i].getRow().compareTo(chosenRow) <= 0)) {
+ && (keys[i].getRow().compareTo(chosenRow) <= 0)) {
resultSets[i].clear();
- if (!scanners[i].next(keys[i], resultSets[i])) {
+ if(! scanners[i].next(keys[i], resultSets[i])) {
closeScanner(i);
}
}
@@ -1231,7 +1238,7 @@
/** All done with the scanner. */
public void close() throws IOException {
for(int i = 0; i < scanners.length; i++) {
- if (scanners[i] != null) {
+ if(scanners[i] != null) {
closeScanner(i);
}
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (revision 530954)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (working copy)
@@ -38,11 +38,11 @@
}
public HRegionInfo(long regionId, HTableDescriptor tableDesc, Text startKey,
- Text endKey) throws IllegalArgumentException {
+ Text endKey) throws IllegalArgumentException {
this.regionId = regionId;
- if (tableDesc == null) {
+ if(tableDesc == null) {
throw new IllegalArgumentException("tableDesc cannot be null");
}
@@ -49,7 +49,7 @@
this.tableDesc = tableDesc;
this.startKey = new Text();
- if (startKey != null) {
+ if(startKey != null) {
this.startKey.set(startKey);
}
@@ -54,7 +54,7 @@
}
this.endKey = new Text();
- if (endKey != null) {
+ if(endKey != null) {
this.endKey.set(endKey);
}
@@ -59,7 +59,7 @@
}
this.regionName = new Text(tableDesc.getName() + "_"
- + (startKey == null ? "" : startKey.toString()) + "_" + regionId);
+ + (startKey == null ? "" : startKey.toString()) + "_" + regionId);
}
//////////////////////////////////////////////////////////////////////////////
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (revision 530954)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (working copy)
@@ -15,7 +15,9 @@
*/
package org.apache.hadoop.hbase;
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.VersionedProtocol;
import java.io.*;
@@ -23,7 +25,7 @@
* Clients interact with HRegionServers using
* a handle to the HRegionInterface.
******************************************************************************/
-public interface HRegionInterface {
+public interface HRegionInterface extends VersionedProtocol {
public static final long versionID = 1L; // initial version
// Get metainfo about an HRegion
@@ -30,10 +32,6 @@
public HRegionInfo getRegionInfo(Text regionName);
- // Start a scanner for a given HRegion.
-
- public HScannerInterface openScanner(Text regionName, Text[] columns, Text startRow) throws IOException;
-
// GET methods for an HRegion.
public BytesWritable get(Text regionName, Text row, Text column) throws IOException;
@@ -58,4 +56,41 @@
public void abort(Text regionName, long clientid, long lockid) throws IOException;
public void commit(Text regionName, long clientid, long lockid) throws IOException;
public void renewLease(long lockid, long clientid) throws IOException;
+
+ //////////////////////////////////////////////////////////////////////////////
+ // remote scanner interface
+ //////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Opens a remote scanner.
+ *
+ * @param clientId - client identifier (so we can associate a scanner with a client)
+ * @param regionName - name of region to scan
+ * @param columns - columns to scan
+ * @param startRow - starting row to scan
+ *
+ * @param scannerId - scanner identifier used in other calls
+ * @throws IOException
+ */
+ public long openScanner(Text regionName, Text[] columns, Text startRow) throws IOException;
+
+ /**
+ * Get the next set of values
+ *
+ * @param scannerId - clientId passed to openScanner
+ * @param key - the next HStoreKey
+ * @param columns - an array of column names
+ * @param values - an array of byte[] values (corresponds 1-1 with columns)
+ * @return - true if a value was retrieved
+ * @throws IOException
+ */
+ public LabelledData[] next(long scannerId, HStoreKey key) throws IOException;
+
+ /**
+ * Close a scanner
+ *
+ * @param scannerId - the scanner id returned by openScanner
+ * @throws IOException
+ */
+ public void close(long scannerId) throws IOException;
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (revision 530954)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (working copy)
@@ -15,6 +15,8 @@
*/
package org.apache.hadoop.hbase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.ipc.*;
@@ -22,6 +24,8 @@
import java.io.*;
import java.util.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/*******************************************************************************
* HRegionServer makes a set of HRegions available to clients. It checks in with
@@ -27,7 +31,20 @@
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
******************************************************************************/
-public class HRegionServer implements HConstants, HRegionInterface, Runnable {
+public class HRegionServer
+ implements HConstants, HRegionInterface, Runnable {
+
+ public long getProtocolVersion(String protocol,
+ long clientVersion) throws IOException {
+ if (protocol.equals(HRegionInterface.class.getName())) {
+ return HRegionInterface.versionID;
+ } else {
+ throw new IOException("Unknown protocol to name node: " + protocol);
+ }
+ }
+
+ private static final Log LOG = LogFactory.getLog(HRegionServer.class);
+
private boolean stopRequested;
private Path regionDir;
private HServerAddress address;
@@ -34,7 +51,7 @@
private Configuration conf;
private Random rand;
private TreeMap regions; // region name -> HRegion
- private HLocking locking;
+ private ReadWriteLock locker;
private Vector outboundMsgs;
private long threadWakeFrequency;
@@ -61,7 +78,7 @@
}
public void run() {
- while(!stopRequested) {
+ while(! stopRequested) {
long startTime = System.currentTimeMillis();
// Grab a list of regions to check
@@ -67,7 +84,7 @@
// Grab a list of regions to check
Vector checkSplit = new Vector();
- locking.obtainReadLock();
+ locker.readLock().lock();
try {
checkSplit.addAll(regions.values());
@@ -72,7 +89,7 @@
checkSplit.addAll(regions.values());
} finally {
- locking.releaseReadLock();
+ locker.readLock().unlock();
}
// Check to see if they need splitting
@@ -78,7 +95,7 @@
// Check to see if they need splitting
Vector toSplit = new Vector();
- for(Iterator it = checkSplit.iterator(); it.hasNext();) {
+ for(Iterator it = checkSplit.iterator(); it.hasNext(); ) {
HRegion cur = it.next();
Text midKey = new Text();
@@ -83,7 +100,7 @@
Text midKey = new Text();
try {
- if (cur.needsSplit(midKey)) {
+ if(cur.needsSplit(midKey)) {
toSplit.add(new SplitRegion(cur, midKey));
}
@@ -92,12 +109,12 @@
}
}
- for(Iterator it = toSplit.iterator(); it.hasNext();) {
+ for(Iterator it = toSplit.iterator(); it.hasNext(); ) {
SplitRegion r = it.next();
- locking.obtainWriteLock();
+ locker.writeLock().lock();
regions.remove(r.region.getRegionName());
- locking.releaseWriteLock();
+ locker.writeLock().unlock();
HRegion[] newRegions = null;
try {
@@ -103,6 +120,8 @@
try {
Text oldRegion = r.region.getRegionName();
+ LOG.info("splitting region: " + oldRegion);
+
newRegions = r.region.closeAndSplit(r.midKey);
// When a region is split, the META table needs to updated if we're
@@ -111,8 +130,10 @@
Text tableToUpdate
= (oldRegion.find(META_TABLE_NAME.toString()) == 0)
- ? ROOT_TABLE_NAME : META_TABLE_NAME;
+ ? ROOT_TABLE_NAME : META_TABLE_NAME;
+ LOG.debug("region split complete. updating meta");
+
client.openTable(tableToUpdate);
long lockid = client.startUpdate(oldRegion);
client.delete(lockid, META_COL_REGIONINFO);
@@ -132,7 +153,14 @@
// Now tell the master about the new regions
+ LOG.debug("reporting region split to master");
+
reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo());
+
+ LOG.info("region split successful. old region=" + oldRegion
+ + ", new regions: " + newRegions[0].getRegionName() + ", "
+ + newRegions[1].getRegionName());
+
newRegions[0].close();
newRegions[1].close();
@@ -145,11 +173,15 @@
// Sleep
- long endTime = System.currentTimeMillis();
- try {
- Thread.sleep(splitCheckFrequency - (endTime - startTime));
-
- } catch(InterruptedException iex) {
+ long waitTime =
+ splitCheckFrequency - (System.currentTimeMillis() - startTime);
+
+ if(waitTime > 0) {
+ try {
+ Thread.sleep(waitTime);
+
+ } catch(InterruptedException iex) {
+ }
}
}
}
@@ -161,7 +193,7 @@
private Thread cacheFlusherThread;
private class Flusher implements Runnable {
public void run() {
- while(!stopRequested) {
+ while(! stopRequested) {
long startTime = System.currentTimeMillis();
// Grab a list of items to flush
@@ -167,7 +199,7 @@
// Grab a list of items to flush
Vector toFlush = new Vector();
- locking.obtainReadLock();
+ locker.readLock().lock();
try {
toFlush.addAll(regions.values());
@@ -172,7 +204,7 @@
toFlush.addAll(regions.values());
} finally {
- locking.releaseReadLock();
+ locker.readLock().unlock();
}
// Flush them, if necessary
@@ -177,7 +209,7 @@
// Flush them, if necessary
- for(Iterator it = toFlush.iterator(); it.hasNext();) {
+ for(Iterator it = toFlush.iterator(); it.hasNext(); ) {
HRegion cur = it.next();
try {
@@ -190,11 +222,15 @@
// Sleep
- long endTime = System.currentTimeMillis();
- try {
- Thread.sleep(threadWakeFrequency - (endTime - startTime));
-
- } catch(InterruptedException iex) {
+ long waitTime =
+ threadWakeFrequency - (System.currentTimeMillis() - startTime);
+
+ if(waitTime > 0) {
+ try {
+ Thread.sleep(waitTime);
+
+ } catch(InterruptedException iex) {
+ }
}
}
}
@@ -212,7 +248,7 @@
private Thread logRollerThread;
private class LogRoller implements Runnable {
public void run() {
- while(!stopRequested) {
+ while(! stopRequested) {
// If the number of log entries is high enough, roll the log. This is a
// very fast operation, but should not be done too frequently.
@@ -217,7 +253,7 @@
// If the number of log entries is high enough, roll the log. This is a
// very fast operation, but should not be done too frequently.
- if (log.getNumEntries() > maxLogEntries) {
+ if(log.getNumEntries() > maxLogEntries) {
try {
log.rollWriter();
@@ -249,8 +285,8 @@
/** Start a HRegionServer at the default location */
public HRegionServer(Configuration conf) throws IOException {
this(new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)),
- new HServerAddress(conf.get("hbase.regionserver.default.name")),
- conf);
+ new HServerAddress(conf.get(REGIONSERVER_ADDRESS, "localhost:0")),
+ conf);
}
/** Start a HRegionServer at an indicated location */
@@ -255,7 +291,7 @@
/** Start a HRegionServer at an indicated location */
public HRegionServer(Path regionDir, HServerAddress address, Configuration conf)
- throws IOException {
+ throws IOException {
// Basic setup
@@ -261,12 +297,12 @@
this.stopRequested = false;
this.regionDir = regionDir;
- this.address = address;
this.conf = conf;
this.rand = new Random();
this.regions = new TreeMap();
- this.locking = new HLocking();
+ this.locker = new ReentrantReadWriteLock();
this.outboundMsgs = new Vector();
+ this.scanners = Collections.synchronizedMap(new TreeMap());
// Config'ed params
@@ -278,7 +314,7 @@
// Cache flushing
this.cacheFlusher = new Flusher();
- this.cacheFlusherThread = new Thread(cacheFlusher);
+ this.cacheFlusherThread = new Thread(cacheFlusher, "HRegionServer.cacheFlusher");
// Check regions to see if they need to be split
@@ -283,21 +319,36 @@
// Check regions to see if they need to be split
this.splitChecker = new SplitChecker();
- this.splitCheckerThread = new Thread(splitChecker);
+ this.splitCheckerThread = new Thread(splitChecker, "HRegionServer.splitChecker");
+
+ // Process requests from Master
+
+ this.toDo = new Vector();
+ this.worker = new Worker();
+ this.workerThread = new Thread(worker, "HRegionServer.worker");
try {
+
+ // Server to handle client requests
+
+ this.server = RPC.getServer(this, address.getBindAddress().toString(),
+ address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf);
+
+ this.address = new HServerAddress(server.getListenerAddress());
+
// Local file paths
- this.fs = FileSystem.get(conf);
- Path newlogdir = new Path(regionDir, "log" + "_" + address.toString());
- this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + address.toString());
+ String serverName = this.address.getBindAddress() + "_" + this.address.getPort();
+ Path newlogdir = new Path(regionDir, "log" + "_" + serverName);
+ this.oldlogfile = new Path(regionDir, "oldlogfile" + "_" + serverName);
// Logging
+ this.fs = FileSystem.get(conf);
HLog.consolidateOldLog(newlogdir, oldlogfile, fs, conf);
this.log = new HLog(fs, newlogdir, conf);
this.logRoller = new LogRoller();
- this.logRollerThread = new Thread(logRoller);
+ this.logRollerThread = new Thread(logRoller, "HRegionServer.logRoller");
// Remote HMaster
@@ -302,13 +353,14 @@
// Remote HMaster
this.hbaseMaster = (HMasterRegionInterface)
- RPC.waitForProxy(HMasterRegionInterface.class,
- HMasterRegionInterface.versionId,
- new HServerAddress(conf.get(MASTER_DEFAULT_NAME)).getInetSocketAddress(),
- conf);
+ RPC.waitForProxy(HMasterRegionInterface.class,
+ HMasterRegionInterface.versionID,
+ new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
+ conf);
// Threads
+ this.workerThread.start();
this.cacheFlusherThread.start();
this.splitCheckerThread.start();
this.logRollerThread.start();
@@ -313,12 +365,10 @@
this.splitCheckerThread.start();
this.logRollerThread.start();
this.leases = new Leases(conf.getLong("hbase.hregionserver.lease.period",
- 3 * 60 * 1000), threadWakeFrequency);
+ 3 * 60 * 1000), threadWakeFrequency);
// Server
- this.server = RPC.getServer(this, address.getBindAddress().toString(),
- address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf);
this.server.start();
} catch(IOException e) {
@@ -325,6 +375,8 @@
this.stopRequested = true;
throw e;
}
+
+ LOG.info("HRegionServer started at: " + address.toString());
}
/**
@@ -334,7 +386,7 @@
* processing to cease.
*/
public void stop() throws IOException {
- if (!stopRequested) {
+ if(! stopRequested) {
stopRequested = true;
closeAllRegions();
@@ -342,7 +394,7 @@
fs.close();
server.stop();
}
-
+ LOG.info("stopping server at: " + address.toString());
}
/** Call join to wait for all the threads to finish */
@@ -348,6 +400,12 @@
/** Call join to wait for all the threads to finish */
public void join() {
try {
+ this.workerThread.join();
+
+ } catch(InterruptedException iex) {
+ }
+
+ try {
this.logRollerThread.join();
} catch(InterruptedException iex) {
@@ -366,7 +424,7 @@
} catch(InterruptedException iex) {
}
-
+ LOG.info("server stopped at: " + address.toString());
}
/**
@@ -375,7 +433,7 @@
* load/unload instructions.
*/
public void run() {
- while(!stopRequested) {
+ while(! stopRequested) {
HServerInfo info = new HServerInfo(address, rand.nextLong());
long lastMsg = 0;
long waitTime;
@@ -388,10 +446,12 @@
} catch(IOException e) {
waitTime = msgInterval - (System.currentTimeMillis() - lastMsg);
- try {
- Thread.sleep(waitTime);
-
- } catch(InterruptedException iex) {
+ if(waitTime > 0) {
+ try {
+ Thread.sleep(waitTime);
+
+ } catch(InterruptedException iex) {
+ }
}
continue;
}
@@ -398,8 +458,8 @@
// Now ask the master what it wants us to do and tell it what we have done.
- while(!stopRequested) {
- if ((System.currentTimeMillis() - lastMsg) >= msgInterval) {
+ while(! stopRequested) {
+ if((System.currentTimeMillis() - lastMsg) >= msgInterval) {
HMsg outboundArray[] = null;
synchronized(outboundMsgs) {
@@ -411,10 +471,33 @@
HMsg msgs[] = hbaseMaster.regionServerReport(info, outboundArray);
lastMsg = System.currentTimeMillis();
- // Process the HMaster's instruction stream
+ // Queue up the HMaster's instruction stream for processing
- if (!processMessages(msgs)) {
- break;
+ synchronized(toDo) {
+ boolean restartOrStop = false;
+ for(int i = 0; i < msgs.length; i++) {
+ switch(msgs[i].getMsg()) {
+
+ case HMsg.MSG_CALL_SERVER_STARTUP:
+ closeAllRegions();
+ restartOrStop = true;
+ break;
+
+ case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING:
+ stop();
+ restartOrStop = true;
+ break;
+
+ default:
+ toDo.add(msgs[i]);
+ }
+ }
+ if(toDo.size() > 0) {
+ toDo.notifyAll();
+ }
+ if(restartOrStop) {
+ break;
+ }
}
} catch(IOException e) {
@@ -424,53 +507,14 @@
waitTime = msgInterval - (System.currentTimeMillis() - lastMsg);
- try {
- Thread.sleep(waitTime);
- } catch(InterruptedException iex) {
+ if(waitTime > 0) {
+ try {
+ Thread.sleep(waitTime);
+ } catch(InterruptedException iex) {
+ }
}
-
- }
- }
- }
-
- private boolean processMessages(HMsg[] msgs) throws IOException {
- for(int i = 0; i < msgs.length; i++) {
- switch(msgs[i].getMsg()) {
-
- case HMsg.MSG_REGION_OPEN: // Open a region
- openRegion(msgs[i].getRegionInfo());
- break;
-
- case HMsg.MSG_REGION_CLOSE: // Close a region
- closeRegion(msgs[i].getRegionInfo(), true);
- break;
-
- case HMsg.MSG_REGION_MERGE: // Merge two regions
- //TODO ???
- throw new IOException("TODO: need to figure out merge");
- //break;
-
- case HMsg.MSG_CALL_SERVER_STARTUP: // Close regions, restart
- closeAllRegions();
- return false;
-
- case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING: // Go away
- stop();
- return false;
-
- case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: // Close a region, don't reply
- closeRegion(msgs[i].getRegionInfo(), false);
- break;
-
- case HMsg.MSG_REGION_CLOSE_AND_DELETE:
- closeAndDeleteRegion(msgs[i].getRegionInfo());
- break;
-
- default:
- throw new IOException("Impossible state during msg processing. Instruction: " + msgs[i]);
}
}
- return true;
}
/** Add to the outbound message buffer */
@@ -508,9 +552,68 @@
// HMaster-given operations
//////////////////////////////////////////////////////////////////////////////
+ private Vector toDo;
+ private Worker worker;
+ private Thread workerThread;
+ private class Worker implements Runnable {
+ public void run() {
+ while(!stopRequested) {
+ HMsg msg = null;
+ synchronized(toDo) {
+ while(toDo.size() == 0) {
+ try {
+ toDo.wait();
+
+ } catch(InterruptedException e) {
+ }
+ }
+ msg = toDo.remove(0);
+ }
+ try {
+ switch(msg.getMsg()) {
+
+ case HMsg.MSG_REGION_OPEN: // Open a region
+ openRegion(msg.getRegionInfo());
+ break;
+
+ case HMsg.MSG_REGION_CLOSE: // Close a region
+ closeRegion(msg.getRegionInfo(), true);
+ break;
+
+ case HMsg.MSG_REGION_MERGE: // Merge two regions
+ //TODO ???
+ throw new IOException("TODO: need to figure out merge");
+ //break;
+
+ case HMsg.MSG_CALL_SERVER_STARTUP: // Close regions, restart
+ closeAllRegions();
+ continue;
+
+ case HMsg.MSG_REGIONSERVER_ALREADY_RUNNING: // Go away
+ stop();
+ continue;
+
+ case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: // Close a region, don't reply
+ closeRegion(msg.getRegionInfo(), false);
+ break;
+
+ case HMsg.MSG_REGION_CLOSE_AND_DELETE:
+ closeAndDeleteRegion(msg.getRegionInfo());
+ break;
+
+ default:
+ throw new IOException("Impossible state during msg processing. Instruction: " + msg);
+ }
+ } catch(IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
private void openRegion(HRegionInfo regionInfo) throws IOException {
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
HRegion region = new HRegion(regionDir, log, fs, conf, regionInfo, null, oldlogfile);
@@ -518,7 +621,7 @@
reportOpen(region);
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
@@ -523,16 +626,16 @@
}
private void closeRegion(HRegionInfo info, boolean reportWhenCompleted)
- throws IOException {
+ throws IOException {
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
HRegion region = regions.remove(info.regionName);
- if (region != null) {
+ if(region != null) {
region.close();
- if (reportWhenCompleted) {
+ if(reportWhenCompleted) {
reportClose(region);
}
}
@@ -538,7 +641,7 @@
}
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
@@ -544,11 +647,11 @@
private void closeAndDeleteRegion(HRegionInfo info) throws IOException {
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
HRegion region = regions.remove(info.regionName);
- if (region != null) {
+ if(region != null) {
region.closeAndDelete();
}
@@ -553,7 +656,7 @@
}
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
@@ -559,9 +662,9 @@
/** Called either when the master tells us to restart or from stop() */
private void closeAllRegions() throws IOException {
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
- for(Iterator it = regions.values().iterator(); it.hasNext();) {
+ for(Iterator it = regions.values().iterator(); it.hasNext(); ) {
HRegion region = it.next();
region.close();
}
@@ -568,7 +671,7 @@
regions.clear();
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
@@ -580,24 +683,24 @@
*
* For now, we do not do merging. Splits are driven by the HRegionServer.
****************************************************************************/
- /*
- private void mergeRegions(Text regionNameA, Text regionNameB) throws IOException {
+/*
+ private void mergeRegions(Text regionNameA, Text regionNameB) throws IOException {
locking.obtainWriteLock();
try {
- HRegion srcA = regions.remove(regionNameA);
- HRegion srcB = regions.remove(regionNameB);
- HRegion newRegion = HRegion.closeAndMerge(srcA, srcB);
- regions.put(newRegion.getRegionName(), newRegion);
+ HRegion srcA = regions.remove(regionNameA);
+ HRegion srcB = regions.remove(regionNameB);
+ HRegion newRegion = HRegion.closeAndMerge(srcA, srcB);
+ regions.put(newRegion.getRegionName(), newRegion);
- reportClose(srcA);
- reportClose(srcB);
- reportOpen(newRegion);
+ reportClose(srcA);
+ reportClose(srcB);
+ reportOpen(newRegion);
} finally {
- locking.releaseWriteLock();
- }
+ locking.releaseWriteLock();
}
- */
+ }
+*/
//////////////////////////////////////////////////////////////////////////////
// HRegionInterface
@@ -606,7 +709,7 @@
/** Obtain a table descriptor for the given region */
public HRegionInfo getRegionInfo(Text regionName) {
HRegion region = getRegion(regionName);
- if (region == null) {
+ if(region == null) {
return null;
}
return region.getRegionInfo();
@@ -612,21 +715,10 @@
return region.getRegionInfo();
}
- /** Start a scanner for a given HRegion. */
- public HScannerInterface openScanner(Text regionName, Text[] cols,
- Text firstRow) throws IOException {
-
- HRegion r = getRegion(regionName);
- if (r == null) {
- throw new IOException("Not serving region " + regionName);
- }
- return r.getScanner(cols, firstRow);
- }
-
/** Get the indicated row/column */
public BytesWritable get(Text regionName, Text row, Text column) throws IOException {
HRegion region = getRegion(regionName);
- if (region == null) {
+ if(region == null) {
throw new IOException("Not serving region " + regionName);
}
@@ -631,7 +723,7 @@
}
byte results[] = region.get(row, column);
- if (results != null) {
+ if(results != null) {
return new BytesWritable(results);
}
return null;
@@ -639,10 +731,10 @@
/** Get multiple versions of the indicated row/col */
public BytesWritable[] get(Text regionName, Text row, Text column,
- int numVersions) throws IOException {
+ int numVersions) throws IOException {
HRegion region = getRegion(regionName);
- if (region == null) {
+ if(region == null) {
throw new IOException("Not serving region " + regionName);
}
@@ -647,10 +739,10 @@
}
byte results[][] = region.get(row, column, numVersions);
- if (results != null) {
+ if(results != null) {
BytesWritable realResults[] = new BytesWritable[results.length];
for(int i = 0; i < realResults.length; i++) {
- if (results[i] != null) {
+ if(results[i] != null) {
realResults[i] = new BytesWritable(results[i]);
}
}
@@ -661,10 +753,10 @@
/** Get multiple timestamped versions of the indicated row/col */
public BytesWritable[] get(Text regionName, Text row, Text column,
- long timestamp, int numVersions) throws IOException {
+ long timestamp, int numVersions) throws IOException {
HRegion region = getRegion(regionName);
- if (region == null) {
+ if(region == null) {
throw new IOException("Not serving region " + regionName);
}
@@ -669,10 +761,10 @@
}
byte results[][] = region.get(row, column, timestamp, numVersions);
- if (results != null) {
+ if(results != null) {
BytesWritable realResults[] = new BytesWritable[results.length];
for(int i = 0; i < realResults.length; i++) {
- if (results[i] != null) {
+ if(results[i] != null) {
realResults[i] = new BytesWritable(results[i]);
}
}
@@ -684,7 +776,7 @@
/** Get all the columns (along with their names) for a given row. */
public LabelledData[] getRow(Text regionName, Text row) throws IOException {
HRegion region = getRegion(regionName);
- if (region == null) {
+ if(region == null) {
throw new IOException("Not serving region " + regionName);
}
@@ -691,7 +783,7 @@
TreeMap map = region.getFull(row);
LabelledData result[] = new LabelledData[map.size()];
int counter = 0;
- for(Iterator it = map.keySet().iterator(); it.hasNext();) {
+ for(Iterator it = map.keySet().iterator(); it.hasNext(); ) {
Text colname = it.next();
byte val[] = map.get(colname);
result[counter++] = new LabelledData(colname, val);
@@ -723,10 +815,10 @@
}
public long startUpdate(Text regionName, long clientid, Text row)
- throws IOException {
+ throws IOException {
HRegion region = getRegion(regionName);
- if (region == null) {
+ if(region == null) {
throw new IOException("Not serving region " + regionName);
}
@@ -732,8 +824,8 @@
long lockid = region.startUpdate(row);
leases.createLease(new Text(String.valueOf(clientid)),
- new Text(String.valueOf(lockid)),
- new RegionListener(region, lockid));
+ new Text(String.valueOf(lockid)),
+ new RegionListener(region, lockid));
return lockid;
}
@@ -740,10 +832,10 @@
/** Add something to the HBase. */
public void put(Text regionName, long clientid, long lockid, Text column,
- BytesWritable val) throws IOException {
+ BytesWritable val) throws IOException {
HRegion region = getRegion(regionName);
- if (region == null) {
+ if(region == null) {
throw new IOException("Not serving region " + regionName);
}
@@ -748,7 +840,7 @@
}
leases.renewLease(new Text(String.valueOf(clientid)),
- new Text(String.valueOf(lockid)));
+ new Text(String.valueOf(lockid)));
region.put(lockid, column, val.get());
}
@@ -755,10 +847,10 @@
/** Remove a cell from the HBase. */
public void delete(Text regionName, long clientid, long lockid, Text column)
- throws IOException {
+ throws IOException {
HRegion region = getRegion(regionName);
- if (region == null) {
+ if(region == null) {
throw new IOException("Not serving region " + regionName);
}
@@ -763,7 +855,7 @@
}
leases.renewLease(new Text(String.valueOf(clientid)),
- new Text(String.valueOf(lockid)));
+ new Text(String.valueOf(lockid)));
region.delete(lockid, column);
}
@@ -770,10 +862,10 @@
/** Abandon the transaction */
public void abort(Text regionName, long clientid, long lockid)
- throws IOException {
+ throws IOException {
HRegion region = getRegion(regionName);
- if (region == null) {
+ if(region == null) {
throw new IOException("Not serving region " + regionName);
}
@@ -778,7 +870,7 @@
}
leases.cancelLease(new Text(String.valueOf(clientid)),
- new Text(String.valueOf(lockid)));
+ new Text(String.valueOf(lockid)));
region.abort(lockid);
}
@@ -785,10 +877,10 @@
/** Confirm the transaction */
public void commit(Text regionName, long clientid, long lockid)
- throws IOException {
+ throws IOException {
HRegion region = getRegion(regionName);
- if (region == null) {
+ if(region == null) {
throw new IOException("Not serving region " + regionName);
}
@@ -793,7 +885,7 @@
}
leases.cancelLease(new Text(String.valueOf(clientid)),
- new Text(String.valueOf(lockid)));
+ new Text(String.valueOf(lockid)));
region.commit(lockid);
}
@@ -801,7 +893,7 @@
/** Don't let the client's lease expire just yet... */
public void renewLease(long lockid, long clientid) throws IOException {
leases.renewLease(new Text(String.valueOf(clientid)),
- new Text(String.valueOf(lockid)));
+ new Text(String.valueOf(lockid)));
}
/** Private utility method for safely obtaining an HRegion handle. */
@@ -806,7 +898,7 @@
/** Private utility method for safely obtaining an HRegion handle. */
private HRegion getRegion(Text regionName) {
- locking.obtainReadLock();
+ this.locker.readLock().lock();
try {
return regions.get(regionName);
@@ -811,8 +903,121 @@
return regions.get(regionName);
} finally {
- locking.releaseReadLock();
+ this.locker.readLock().unlock();
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////////
+ // remote scanner interface
+ //////////////////////////////////////////////////////////////////////////////
+
+ private Map scanners;
+ private class ScannerListener extends LeaseListener {
+ private Text scannerName;
+
+ public ScannerListener(Text scannerName) {
+ this.scannerName = scannerName;
+ }
+
+ public void leaseExpired() {
+ HScannerInterface s = scanners.remove(scannerName);
+ if(s != null) {
+ try {
+ s.close();
+
+ } catch(IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ /** Start a scanner for a given HRegion. */
+ public long openScanner(Text regionName, Text[] cols, Text firstRow)
+ throws IOException {
+
+ HRegion r = getRegion(regionName);
+ if(r == null) {
+ throw new IOException("Not serving region " + regionName);
+ }
+
+ long scannerId = -1L;
+ try {
+ HScannerInterface s = r.getScanner(cols, firstRow);
+ scannerId = rand.nextLong();
+ Text scannerName = new Text(String.valueOf(scannerId));
+ scanners.put(scannerName, s);
+ leases.createLease(scannerName, scannerName, new ScannerListener(scannerName));
+
+ } catch(IOException e) {
+ e.printStackTrace();
+ throw e;
+ }
+ return scannerId;
+ }
+
+ public LabelledData[] next(long scannerId, HStoreKey key) throws IOException {
+
+ Text scannerName = new Text(String.valueOf(scannerId));
+ HScannerInterface s = scanners.get(scannerName);
+ if(s == null) {
+ throw new IOException("unknown scanner");
+ }
+ leases.renewLease(scannerName, scannerName);
+ TreeMap results = new TreeMap();
+ ArrayList values = new ArrayList();
+ if(s.next(key, results)) {
+ for(Iterator> it = results.entrySet().iterator();
+ it.hasNext(); ) {
+ Map.Entry e = it.next();
+ values.add(new LabelledData(e.getKey(), e.getValue()));
+ }
+ }
+ return values.toArray(new LabelledData[values.size()]);
+ }
+
+ public void close(long scannerId) throws IOException {
+ Text scannerName = new Text(String.valueOf(scannerId));
+ HScannerInterface s = scanners.remove(scannerName);
+ if(s == null) {
+ throw new IOException("unknown scanner");
}
+ try {
+ s.close();
+
+ } catch(IOException ex) {
+ ex.printStackTrace();
+ }
+ leases.cancelLease(scannerName, scannerName);
}
+ //////////////////////////////////////////////////////////////////////////////
+ // Main program
+ //////////////////////////////////////////////////////////////////////////////
+
+ private static void printUsage() {
+ System.err.println("Usage: java " +
+ "org.apache.hbase.HRegionServer [--bind=hostname:port]");
+ }
+
+ public static void main(String [] args) throws IOException {
+ Configuration conf = new HBaseConfiguration();
+
+ // Process command-line args. TODO: Better cmd-line processing
+ // (but hopefully something not as painful as cli options).
+ for (String cmd: args) {
+ if (cmd.equals("-h") || cmd.startsWith("--h")) {
+ printUsage();
+ return;
+ }
+
+ final String addressArgKey = "--bind=";
+ if (cmd.startsWith(addressArgKey)) {
+ conf.set(REGIONSERVER_ADDRESS,
+ cmd.substring(addressArgKey.length()));
+ }
+ }
+
+ new HRegionServer(conf);
+ }
}
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java (revision 530954)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HServerAddress.java (working copy)
@@ -33,9 +33,15 @@
this.stringValue = null;
}
+ public HServerAddress(InetSocketAddress address) {
+ this.address = address;
+ this.stringValue = new String(address.getAddress().getHostAddress()
+ + ":" + address.getPort());
+ }
+
public HServerAddress(String hostAndPort) {
int colonIndex = hostAndPort.indexOf(':');
- if (colonIndex < 0) {
+ if(colonIndex < 0) {
throw new IllegalArgumentException("Not a host:port pair: " + hostAndPort);
}
String host = hostAndPort.substring(0, colonIndex);
@@ -80,7 +86,7 @@
String bindAddress = in.readUTF();
int port = in.readInt();
- if (bindAddress == null || bindAddress.length() == 0) {
+ if(bindAddress == null || bindAddress.length() == 0) {
address = null;
stringValue = null;
@@ -91,7 +97,7 @@
}
public void write(DataOutput out) throws IOException {
- if (address == null) {
+ if(address == null) {
out.writeUTF("");
out.writeInt(0);
Index: src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
===================================================================
--- src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (revision 530954)
+++ src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (working copy)
@@ -15,14 +15,26 @@
*/
package org.apache.hadoop.hbase;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.Vector;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.fs.*;
-
-import java.io.*;
-import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
/*******************************************************************************
* HStore maintains a bunch of data files. It is responsible for maintaining
@@ -53,7 +65,7 @@
Integer compactLock = new Integer(0);
Integer flushLock = new Integer(0);
- HLocking locking = new HLocking();
+ ReadWriteLock locker = new ReentrantReadWriteLock();
TreeMap maps = new TreeMap();
TreeMap mapFiles = new TreeMap();
@@ -88,7 +100,7 @@
* will be deleted (by whoever has instantiated the HStore).
*/
public HStore(Path dir, Text regionName, Text colFamily, int maxVersions,
- FileSystem fs, Path reconstructionLog, Configuration conf) throws IOException {
+ FileSystem fs, Path reconstructionLog, Configuration conf) throws IOException {
this.dir = dir;
this.regionName = regionName;
@@ -110,7 +122,7 @@
this.compactdir = new Path(dir, COMPACTION_DIR);
Path curCompactStore = HStoreFile.getHStoreDir(compactdir, regionName, colFamily);
- if (fs.exists(curCompactStore)) {
+ if(fs.exists(curCompactStore)) {
processReadyCompaction();
fs.delete(curCompactStore);
}
@@ -123,7 +135,7 @@
Vector hstoreFiles
= HStoreFile.loadHStoreFiles(conf, dir, regionName, colFamily, fs);
- for(Iterator it = hstoreFiles.iterator(); it.hasNext();) {
+ for(Iterator it = hstoreFiles.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
mapFiles.put(hsf.loadInfo(fs), hsf);
}
@@ -138,11 +150,11 @@
// contain any updates also contained in the log.
long maxSeqID = -1;
- for(Iterator it = hstoreFiles.iterator(); it.hasNext();) {
+ for(Iterator it = hstoreFiles.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
long seqid = hsf.loadInfo(fs);
- if (seqid > 0) {
- if (seqid > maxSeqID) {
+ if(seqid > 0) {
+ if(seqid > maxSeqID) {
maxSeqID = seqid;
}
}
@@ -157,7 +169,7 @@
LOG.debug("reading reconstructionLog");
- if (reconstructionLog != null && fs.exists(reconstructionLog)) {
+ if(reconstructionLog != null && fs.exists(reconstructionLog)) {
long maxSeqIdInLog = -1;
TreeMap reconstructedCache
= new TreeMap();
@@ -170,11 +182,11 @@
HLogEdit val = new HLogEdit();
while(login.next(key, val)) {
maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
- if (key.getLogSeqNum() <= maxSeqID) {
+ if(key.getLogSeqNum() <= maxSeqID) {
continue;
}
reconstructedCache.put(new HStoreKey(key.getRow(), val.getColumn(),
- val.getTimestamp()), val.getVal());
+ val.getTimestamp()), val.getVal());
}
} finally {
@@ -181,7 +193,7 @@
login.close();
}
- if (reconstructedCache.size() > 0) {
+ if(reconstructedCache.size() > 0) {
// We create a "virtual flush" at maxSeqIdInLog+1.
@@ -195,7 +207,7 @@
// should be "timeless"; that is, it should not have an associated seq-ID,
// because all log messages have been reflected in the TreeMaps at this point.
- if (mapFiles.size() >= 1) {
+ if(mapFiles.size() >= 1) {
compactHelper(true);
}
@@ -204,7 +216,7 @@
LOG.debug("starting map readers");
- for(Iterator it = mapFiles.keySet().iterator(); it.hasNext();) {
+ for(Iterator it = mapFiles.keySet().iterator(); it.hasNext(); ) {
Long key = it.next().longValue();
HStoreFile hsf = mapFiles.get(key);
@@ -218,11 +230,11 @@
/** Turn off all the MapFile readers */
public void close() throws IOException {
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
LOG.info("closing HStore for " + this.regionName + "/" + this.colFamily);
try {
- for(Iterator it = maps.values().iterator(); it.hasNext();) {
+ for(Iterator it = maps.values().iterator(); it.hasNext(); ) {
MapFile.Reader map = it.next();
map.close();
}
@@ -232,7 +244,7 @@
LOG.info("HStore closed for " + this.regionName + "/" + this.colFamily);
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
@@ -252,7 +264,7 @@
* Return the entire list of HStoreFiles currently used by the HStore.
*/
public Vector flushCache(TreeMap inputCache,
- long logCacheFlushId) throws IOException {
+ long logCacheFlushId) throws IOException {
return flushCacheHelper(inputCache, logCacheFlushId, true);
}
@@ -258,7 +270,7 @@
}
Vector flushCacheHelper(TreeMap inputCache,
- long logCacheFlushId, boolean addToAvailableMaps) throws IOException {
+ long logCacheFlushId, boolean addToAvailableMaps) throws IOException {
synchronized(flushLock) {
LOG.debug("flushing HStore " + this.regionName + "/" + this.colFamily);
@@ -270,12 +282,12 @@
Path mapfile = flushedFile.getMapFilePath();
MapFile.Writer out = new MapFile.Writer(conf, fs, mapfile.toString(),
- HStoreKey.class, BytesWritable.class);
+ HStoreKey.class, BytesWritable.class);
try {
- for(Iterator it = inputCache.keySet().iterator(); it.hasNext();) {
+ for(Iterator it = inputCache.keySet().iterator(); it.hasNext(); ) {
HStoreKey curkey = it.next();
- if (this.colFamily.equals(HStoreKey.extractFamily(curkey.getColumn()))) {
+ if(this.colFamily.equals(HStoreKey.extractFamily(curkey.getColumn()))) {
BytesWritable val = inputCache.get(curkey);
out.append(curkey, val);
}
@@ -294,8 +306,8 @@
// C. Finally, make the new MapFile available.
- if (addToAvailableMaps) {
- locking.obtainWriteLock();
+ if(addToAvailableMaps) {
+ this.locker.writeLock().lock();
try {
maps.put(logCacheFlushId, new MapFile.Reader(fs, mapfile.toString(), conf));
@@ -303,7 +315,7 @@
LOG.debug("HStore available for " + this.regionName + "/" + this.colFamily);
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
}
return getAllMapFiles();
@@ -312,7 +324,7 @@
public Vector getAllMapFiles() {
Vector flushedFiles = new Vector();
- for(Iterator it = mapFiles.values().iterator(); it.hasNext();) {
+ for(Iterator it = mapFiles.values().iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
flushedFiles.add(hsf);
}
@@ -355,7 +367,7 @@
// Grab a list of files to compact.
Vector toCompactFiles = null;
- locking.obtainWriteLock();
+ this.locker.writeLock().lock();
try {
toCompactFiles = new Vector(mapFiles.values());
@@ -360,7 +372,7 @@
toCompactFiles = new Vector(mapFiles.values());
} finally {
- locking.releaseWriteLock();
+ this.locker.writeLock().unlock();
}
// Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps
@@ -366,11 +378,11 @@
// Compute the max-sequenceID seen in any of the to-be-compacted TreeMaps
long maxSeenSeqID = -1;
- for(Iterator it = toCompactFiles.iterator(); it.hasNext();) {
+ for(Iterator it = toCompactFiles.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
long seqid = hsf.loadInfo(fs);
- if (seqid > 0) {
- if (seqid > maxSeenSeqID) {
+ if(seqid > 0) {
+ if(seqid > maxSeenSeqID) {
maxSeenSeqID = seqid;
}
}
@@ -380,11 +392,11 @@
HStoreFile compactedOutputFile
= new HStoreFile(conf, compactdir, regionName, colFamily, -1);
- if (toCompactFiles.size() == 1) {
+ if(toCompactFiles.size() == 1) {
LOG.debug("nothing to compact for " + this.regionName + "/" + this.colFamily);
HStoreFile hsf = toCompactFiles.elementAt(0);
- if (hsf.loadInfo(fs) == -1) {
+ if(hsf.loadInfo(fs) == -1) {
return;
}
}
@@ -392,8 +404,8 @@
// Step through them, writing to the brand-new TreeMap
MapFile.Writer compactedOut = new MapFile.Writer(conf, fs,
- compactedOutputFile.getMapFilePath().toString(), HStoreKey.class,
- BytesWritable.class);
+ compactedOutputFile.getMapFilePath().toString(), HStoreKey.class,
+ BytesWritable.class);
try {
@@ -414,7 +426,7 @@
BytesWritable[] vals = new BytesWritable[toCompactFiles.size()];
boolean[] done = new boolean[toCompactFiles.size()];
int pos = 0;
- for(Iterator it = toCompactFiles.iterator(); it.hasNext();) {
+ for(Iterator it = toCompactFiles.iterator(); it.hasNext(); ) {
HStoreFile hsf = it.next();
readers[pos] = new MapFile.Reader(fs, hsf.getMapFilePath().toString(), conf);
keys[pos] = new HStoreKey();
@@ -431,8 +443,8 @@
int numDone = 0;
for(int i = 0; i < readers.length; i++) {
readers[i].reset();
- done[i] = !readers[i].next(keys[i], vals[i]);
- if (done[i]) {
+ done[i] = ! readers[i].next(keys[i], vals[i]);
+ if(done[i]) {
numDone++;
}
}
@@ -446,15 +458,15 @@
int smallestKey = -1;
for(int i = 0; i < readers.length; i++) {
- if (done[i]) {
+ if(done[i]) {
continue;
}
- if (smallestKey < 0) {
+ if(smallestKey < 0) {
smallestKey = i;
} else {
- if (keys[i].compareTo(keys[smallestKey]) < 0) {
+ if(keys[i].compareTo(keys[smallestKey]) < 0) {
smallestKey = i;
}
}
@@ -463,7 +475,7 @@
// Reflect the current key/val in the output
HStoreKey sk = keys[smallestKey];
- if (lastRow.equals(sk.getRow())
+ if(lastRow.equals(sk.getRow())
&& lastColumn.equals(sk.getColumn())) {
timesSeen++;
@@ -472,7 +484,7 @@
timesSeen = 1;
}
- if (timesSeen <= maxVersions) {
+ if(timesSeen <= maxVersions) {
// Keep old versions until we have maxVersions worth.
// Then just skip them.
@@ -477,7 +489,7 @@
// Keep old versions until we have maxVersions worth.
// Then just skip them.
- if (sk.getRow().getLength() != 0
+ if(sk.getRow().getLength() != 0
&& sk.getColumn().getLength() != 0) {
// Only write out objects which have a non-zero length key and value
@@ -499,7 +511,7 @@
// Advance the smallest key. If that reader's all finished, then
// mark it as done.
- if (!readers[smallestKey].next(keys[smallestKey], vals[smallestKey])) {
+ if(! readers[smallestKey].next(keys[smallestKey], vals[smallestKey])) {
done[smallestKey] = true;
readers[smallestKey].close();
numDone++;
@@ -516,7 +528,7 @@
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
- if ((!deleteSequenceInfo) && maxSeenSeqID >= 0) {
+ if((! deleteSequenceInfo) && maxSeenSeqID >= 0) {
compactedOutputFile.writeInfo(fs, maxSeenSeqID);
} else {
@@ -529,7 +541,7 @@
DataOutputStream out = new DataOutputStream(fs.create(filesToReplace));
try {
out.writeInt(toCompactFiles.size());
- for(Iterator