Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java (revision 988213) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java (working copy) @@ -93,7 +93,7 @@ SplitTransaction st = new SplitTransaction(this.parent, GOOD_SPLIT_ROW); assertTrue(st.prepare()); // Assert the write lock is held on successful prepare as the javadoc asserts. - assertTrue(this.parent.splitsAndClosesLock.writeLock().isHeldByCurrentThread()); + assertTrue(this.parent.lock.writeLock().isHeldByCurrentThread()); return st; } @@ -162,7 +162,7 @@ } assertEquals(rowcount, daughtersRowCount); // Assert the write lock is no longer held on parent - assertTrue(!this.parent.splitsAndClosesLock.writeLock().isHeldByCurrentThread()); + assertTrue(!this.parent.lock.writeLock().isHeldByCurrentThread()); } @Test public void testRollback() throws IOException { @@ -194,7 +194,7 @@ // Assert rollback cleaned up stuff in fs assertTrue(!this.fs.exists(HRegion.getRegionDir(this.testdir, st.getFirstDaughter()))); assertTrue(!this.fs.exists(HRegion.getRegionDir(this.testdir, st.getSecondDaughter()))); - assertTrue(!this.parent.splitsAndClosesLock.writeLock().isHeldByCurrentThread()); + assertTrue(!this.parent.lock.writeLock().isHeldByCurrentThread()); // Now retry the split but do not throw an exception this time. assertTrue(st.prepare()); @@ -215,7 +215,7 @@ } assertEquals(rowcount, daughtersRowCount); // Assert the write lock is no longer held on parent - assertTrue(!this.parent.splitsAndClosesLock.writeLock().isHeldByCurrentThread()); + assertTrue(!this.parent.lock.writeLock().isHeldByCurrentThread()); } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (revision 988213) +++ src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (working copy) @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.Reference.Range; @@ -133,7 +132,7 @@ */ public boolean prepare() { boolean prepared = false; - this.parent.splitsAndClosesLock.writeLock().lock(); + this.parent.lock.writeLock().lock(); try { if (this.parent.isClosed() || this.parent.isClosing()) return prepared; HRegionInfo hri = this.parent.getRegionInfo(); @@ -153,7 +152,7 @@ false, rid); prepared = true; } finally { - if (!prepared) this.parent.splitsAndClosesLock.writeLock().unlock(); + if (!prepared) this.parent.lock.writeLock().unlock(); } return prepared; } @@ -198,7 +197,7 @@ PairOfSameType execute(final OnlineRegions or, final boolean updateMeta) throws IOException { LOG.info("Starting split of region " + this.parent); - if (!this.parent.splitsAndClosesLock.writeLock().isHeldByCurrentThread()) { + if (!this.parent.lock.writeLock().isHeldByCurrentThread()) { throw new SplitAndCloseWriteLockNotHeld(); } @@ -274,7 +273,7 @@ if (t != null) t.close(); // Unlock if successful split. - this.parent.splitsAndClosesLock.writeLock().unlock(); + this.parent.lock.writeLock().unlock(); // Leaving here, the splitdir with its dross will be in place but since the // split was successful, just leave it; it'll be cleaned when parent is @@ -447,7 +446,7 @@ * @throws IOException If thrown, rollback failed. Take drastic action. */ public void rollback(final OnlineRegions or) throws IOException { - if (!this.parent.splitsAndClosesLock.writeLock().isHeldByCurrentThread()) { + if (!this.parent.lock.writeLock().isHeldByCurrentThread()) { throw new SplitAndCloseWriteLockNotHeld(); } FileSystem fs = this.parent.getFilesystem(); @@ -487,8 +486,8 @@ throw new RuntimeException("Unhandled journal entry: " + je); } } - if (this.parent.splitsAndClosesLock.writeLock().isHeldByCurrentThread()) { - this.parent.splitsAndClosesLock.writeLock().unlock(); + if (this.parent.lock.writeLock().isHeldByCurrentThread()) { + this.parent.lock.writeLock().unlock(); } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 988213) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -34,7 +34,6 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -81,7 +80,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.StringUtils; -import org.eclipse.jdt.core.dom.ThisExpression; import com.google.common.collect.Lists; @@ -217,10 +215,8 @@ private final long blockingMemStoreSize; final long threadWakeFrequency; // Used to guard splits and closes - final ReentrantReadWriteLock splitsAndClosesLock = + final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final ReentrantReadWriteLock newScannerLock = - new ReentrantReadWriteLock(); // Stop updates lock private final ReentrantReadWriteLock updatesLock = @@ -503,39 +499,28 @@ LOG.info("Running close preflush of " + this.getRegionNameAsString()); internalFlushcache(); } - newScannerLock.writeLock().lock(); this.closing.set(true); + lock.writeLock().lock(); try { - splitsAndClosesLock.writeLock().lock(); if (this.isClosed()) { // SplitTransaction handles the null return null; } - LOG.debug("Updates disabled for region, no outstanding scanners on " + this); - try { - // Write lock means no more row locks can be given out. Wait on - // outstanding row locks to come in before we close so we do not drop - // outstanding updates. - waitOnRowLocks(); - LOG.debug("No more row locks outstanding on region " + this); + LOG.debug("Updates disabled for region " + this); + // Don't flush the cache if we are aborting + if (!abort) { + internalFlushcache(); + } - // Don't flush the cache if we are aborting - if (!abort) { - internalFlushcache(); - } - - List result = new ArrayList(); - for (Store store: stores.values()) { - result.addAll(store.close()); - } - this.closed.set(true); - LOG.info("Closed " + this); - return result; - } finally { - splitsAndClosesLock.writeLock().unlock(); + List result = new ArrayList(); + for (Store store : stores.values()) { + result.addAll(store.close()); } + this.closed.set(true); + LOG.info("Closed " + this); + return result; } finally { - newScannerLock.writeLock().unlock(); + lock.writeLock().unlock(); } } @@ -703,12 +688,16 @@ */ byte [] compactStores(final boolean majorCompaction) throws IOException { - if (this.closing.get() || this.closed.get()) { - LOG.debug("Skipping compaction on " + this + " because closing/closed"); + if (this.closing.get()) { + LOG.debug("Skipping compaction on " + this + " because closing"); return null; } - splitsAndClosesLock.readLock().lock(); + lock.readLock().lock(); try { + if (this.closed.get()) { + LOG.debug("Skipping compaction on " + this + " because closed"); + return null; + } byte [] splitRow = null; if (this.closed.get()) { return splitRow; @@ -747,7 +736,7 @@ } return splitRow; } finally { - splitsAndClosesLock.readLock().unlock(); + lock.readLock().unlock(); } } @@ -772,36 +761,41 @@ * because a Snapshot was not properly persisted. */ public boolean flushcache() throws IOException { - if (this.closed.get()) { + // fail-fast instead of waiting on the lock + if (this.closing.get()) { + LOG.debug("Skipping flush on " + this + " because closing"); return false; } - synchronized (writestate) { - if (!writestate.flushing && writestate.writesEnabled) { - this.writestate.flushing = true; - } else { - if(LOG.isDebugEnabled()) { - LOG.debug("NOT flushing memstore for region " + this + - ", flushing=" + - writestate.flushing + ", writesEnabled=" + - writestate.writesEnabled); - } + lock.readLock().lock(); + try { + if (this.closed.get()) { + LOG.debug("Skipping flush on " + this + " because closed"); return false; } - } - try { - // Prevent splits and closes - splitsAndClosesLock.readLock().lock(); try { + synchronized (writestate) { + if (!writestate.flushing && writestate.writesEnabled) { + this.writestate.flushing = true; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("NOT flushing memstore for region " + this + + ", flushing=" + + writestate.flushing + ", writesEnabled=" + + writestate.writesEnabled); + } + return false; + } + } return internalFlushcache(); } finally { - splitsAndClosesLock.readLock().unlock(); + synchronized (writestate) { + writestate.flushing = false; + this.writestate.flushRequested = false; + writestate.notifyAll(); + } } } finally { - synchronized (writestate) { - writestate.flushing = false; - this.writestate.flushRequested = false; - writestate.notifyAll(); - } + lock.readLock().unlock(); } } @@ -915,38 +909,14 @@ for (StoreFlusher flusher : storeFlushers) { flusher.flushCache(); } - - Callable atomicWork = internalPreFlushcacheCommit(); - - LOG.debug("Caches flushed, doing commit now (which includes update scanners)"); - - /** - * Switch between memstore(snapshot) and the new store file - */ - if (atomicWork != null) { - LOG.debug("internalPreFlushcacheCommit gives us work to do, acquiring newScannerLock"); - newScannerLock.writeLock().lock(); - } - - try { - if (atomicWork != null) { - atomicWork.call(); + // Switch snapshot (in memstore) -> new hfile (thus causing + // all the store scanners to reset/reseek). + for (StoreFlusher flusher : storeFlushers) { + boolean needsCompaction = flusher.commit(); + if (needsCompaction) { + compactionRequested = true; } - - // Switch snapshot (in memstore) -> new hfile (thus causing - // all the store scanners to reset/reseek). - for (StoreFlusher flusher : storeFlushers) { - boolean needsCompaction = flusher.commit(); - if (needsCompaction) { - compactionRequested = true; - } - } - } finally { - if (atomicWork != null) { - newScannerLock.writeLock().unlock(); - } } - storeFlushers.clear(); // Set down the memstore size by amount of flush. @@ -996,20 +966,6 @@ } /** - * A hook for sub classed wishing to perform operations prior to the cache - * flush commit stage. - * - * If a subclass wishes that an atomic update of their work and the - * flush commit stage happens, they should return a callable. The new scanner - * lock will be acquired and released. - - * @throws java.io.IOException allow children to throw exception - */ - protected Callable internalPreFlushcacheCommit() throws IOException { - return null; - } - - /** * Get the sequence number to be associated with this cache flush. Used by * TransactionalRegion to not complete pending transactions. * @@ -1054,7 +1010,7 @@ // closest key is across all column families, since the data may be sparse KeyValue key = null; checkRow(row); - splitsAndClosesLock.readLock().lock(); + startRegionOperation(); try { Store store = getStore(family); KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP); @@ -1067,7 +1023,7 @@ get.addFamily(family); return get(get, null); } finally { - splitsAndClosesLock.readLock().unlock(); + closeRegionOperation(); } } @@ -1087,11 +1043,8 @@ } protected InternalScanner getScanner(Scan scan, List additionalScanners) throws IOException { - newScannerLock.readLock().lock(); + startRegionOperation(); try { - if (this.closed.get()) { - throw new NotServingRegionException("Region " + this + " closed"); - } // Verify families are all valid if(scan.hasFamilies()) { for(byte [] family : scan.getFamilyMap().keySet()) { @@ -1105,7 +1058,7 @@ return instantiateInternalScanner(scan, additionalScanners); } finally { - newScannerLock.readLock().unlock(); + closeRegionOperation(); } } @@ -1147,7 +1100,7 @@ checkReadOnly(); checkResources(); Integer lid = null; - splitsAndClosesLock.readLock().lock(); + startRegionOperation(); try { byte [] row = delete.getRow(); // If we did not pass an existing row lock, obtain a new one @@ -1159,7 +1112,7 @@ } finally { if(lockid == null) releaseRowLock(lid); - splitsAndClosesLock.readLock().unlock(); + closeRegionOperation(); } } @@ -1292,8 +1245,7 @@ // read lock, resources may run out. For now, the thought is that this // will be extremely rare; we'll deal with it when it happens. checkResources(); - splitsAndClosesLock.readLock().lock(); - + startRegionOperation(); try { // We obtain a per-row lock, so other clients will block while one client // performs an update. The read lock is released by the client calling @@ -1311,7 +1263,7 @@ if(lockid == null) releaseRowLock(lid); } } finally { - splitsAndClosesLock.readLock().unlock(); + closeRegionOperation(); } } @@ -1364,12 +1316,12 @@ checkResources(); long newSize; - splitsAndClosesLock.readLock().lock(); + startRegionOperation(); try { long addedSize = doMiniBatchPut(batchOp); newSize = memstoreSize.addAndGet(addedSize); } finally { - splitsAndClosesLock.readLock().unlock(); + closeRegionOperation(); } if (isFlushSize(newSize)) { requestFlush(); @@ -1507,7 +1459,7 @@ if (!isPut && !(w instanceof Delete)) throw new IOException("Action must be Put or Delete"); - splitsAndClosesLock.readLock().lock(); + startRegionOperation(); try { RowLock lock = isPut ? ((Put)w).getRowLock() : ((Delete)w).getRowLock(); Get get = new Get(row, lock); @@ -1545,7 +1497,7 @@ if(lockId == null) releaseRowLock(lid); } } finally { - splitsAndClosesLock.readLock().unlock(); + closeRegionOperation(); } } @@ -2020,7 +1972,12 @@ * @return The id of the held lock. */ public Integer obtainRowLock(final byte [] row) throws IOException { - return internalObtainRowLock(row, true); + startRegionOperation(); + try { + return internalObtainRowLock(row, true); + } finally { + closeRegionOperation(); + } } /** @@ -2030,7 +1987,12 @@ * @see HRegion#obtainRowLock(byte[]) */ public Integer tryObtainRowLock(final byte[] row) throws IOException { - return internalObtainRowLock(row, false); + startRegionOperation(); + try { + return internalObtainRowLock(row, false); + } finally { + closeRegionOperation(); + } } /** @@ -2042,11 +2004,8 @@ private Integer internalObtainRowLock(final byte[] row, boolean waitForLock) throws IOException { checkRow(row); - splitsAndClosesLock.readLock().lock(); + startRegionOperation(); try { - if (this.closed.get()) { - throw new NotServingRegionException(this + " is closed"); - } synchronized (lockedRows) { while (lockedRows.contains(row)) { if (!waitForLock) { @@ -2080,7 +2039,7 @@ return lockId; } } finally { - splitsAndClosesLock.readLock().unlock(); + closeRegionOperation(); } } @@ -2144,24 +2103,9 @@ return lid; } - private void waitOnRowLocks() { - synchronized (lockedRows) { - while (!this.lockedRows.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Waiting on " + this.lockedRows.size() + " row locks"); - } - try { - this.lockedRows.wait(); - } catch (InterruptedException e) { - // Catch. Let while test determine loop-end. - } - } - } - } - public void bulkLoadHFile(String hfilePath, byte[] familyName) throws IOException { - splitsAndClosesLock.readLock().lock(); + startRegionOperation(); try { Store store = getStore(familyName); if (store == null) { @@ -2170,7 +2114,7 @@ } store.bulkLoadHFile(hfilePath); } finally { - splitsAndClosesLock.readLock().unlock(); + closeRegionOperation(); } } @@ -2263,24 +2207,24 @@ "after we renewed it. Could be caused by a very slow scanner " + "or a lengthy garbage collection"); } - if (closing.get() || closed.get()) { - close(); - throw new NotServingRegionException(regionInfo.getRegionNameAsString() + - " is closing=" + closing.get() + " or closed=" + closed.get()); - } + startRegionOperation(); + try { - // This could be a new thread from the last time we called next(). - ReadWriteConsistencyControl.setThreadReadPoint(this.readPt); + // This could be a new thread from the last time we called next(). + ReadWriteConsistencyControl.setThreadReadPoint(this.readPt); - results.clear(); - boolean returnResult = nextInternal(limit); + results.clear(); + boolean returnResult = nextInternal(limit); - outResults.addAll(results); - resetFilters(); - if (isFilterDone()) { - return false; + outResults.addAll(results); + resetFilters(); + if (isFilterDone()) { + return false; + } + return returnResult; + } finally { + closeRegionOperation(); } - return returnResult; } public synchronized boolean next(List outResults) @@ -2916,47 +2860,52 @@ checkRow(row); boolean flush = false; // Lock row - Integer lid = obtainRowLock(row); long result = amount; + startRegionOperation(); try { - Store store = stores.get(family); + Integer lid = obtainRowLock(row); + try { + Store store = stores.get(family); - // Get the old value: - Get get = new Get(row); - get.addColumn(family, qualifier); + // Get the old value: + Get get = new Get(row); + get.addColumn(family, qualifier); - List results = get(get); + List results = get(get); - if (!results.isEmpty()) { - KeyValue kv = results.get(0); - byte [] buffer = kv.getBuffer(); - int valueOffset = kv.getValueOffset(); - result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG); - } + if (!results.isEmpty()) { + KeyValue kv = results.get(0); + byte [] buffer = kv.getBuffer(); + int valueOffset = kv.getValueOffset(); + result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG); + } - // bulid the KeyValue now: - KeyValue newKv = new KeyValue(row, family, - qualifier, EnvironmentEdgeManager.currentTimeMillis(), - Bytes.toBytes(result)); + // bulid the KeyValue now: + KeyValue newKv = new KeyValue(row, family, + qualifier, EnvironmentEdgeManager.currentTimeMillis(), + Bytes.toBytes(result)); - // now log it: - if (writeToWAL) { - long now = EnvironmentEdgeManager.currentTimeMillis(); - WALEdit walEdit = new WALEdit(); - walEdit.add(newKv); - this.log.append(regionInfo, regionInfo.getTableDesc().getName(), - walEdit, now); - } + // now log it: + if (writeToWAL) { + long now = EnvironmentEdgeManager.currentTimeMillis(); + WALEdit walEdit = new WALEdit(); + walEdit.add(newKv); + this.log.append(regionInfo, regionInfo.getTableDesc().getName(), + walEdit, now); + } - // Now request the ICV to the store, this will set the timestamp - // appropriately depending on if there is a value in memcache or not. - // returns the - long size = store.updateColumnValue(row, family, qualifier, result); + // Now request the ICV to the store, this will set the timestamp + // appropriately depending on if there is a value in memcache or not. + // returns the + long size = store.updateColumnValue(row, family, qualifier, result); - size = this.memstoreSize.addAndGet(size); - flush = isFlushSize(size); + size = this.memstoreSize.addAndGet(size); + flush = isFlushSize(size); + } finally { + releaseRowLock(lid); + } } finally { - releaseRowLock(lid); + closeRegionOperation(); } if (flush) { @@ -2983,7 +2932,7 @@ public static final long FIXED_OVERHEAD = ClassSize.align( (4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN + - (19 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); + (18 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) + @@ -3101,6 +3050,34 @@ } /** + * This method needs to be called before any public call that reads or + * modifies data. It has to be called just before a try. + * #closeRegionOperation needs to be called in the try's finally block + * Acquires a read lock and checks if the region is closing or closed. + * @throws NotServingRegionException when the region is closing or closed + */ + private void startRegionOperation() throws NotServingRegionException { + if (this.closing.get()) { + throw new NotServingRegionException(regionInfo.getRegionNameAsString() + + " is closing"); + } + lock.readLock().lock(); + if (this.closed.get()) { + lock.readLock().unlock(); + throw new NotServingRegionException(regionInfo.getRegionNameAsString() + + " is closed"); + } + } + + /** + * Closes the lock. This needs to be called in the finally block corresponding + * to the try block of #startRegionOperation + */ + private void closeRegionOperation(){ + lock.readLock().unlock(); + } + + /** * A mocked list implementaion - discards all updates. */ private static final List MOCKED_LIST = new AbstractList() {