diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index da326e3..717b808 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -122,13 +123,13 @@ public class DefaultStoreEngine extends StoreEngine< @Override public List compact(CompactionThroughputController throughputController) throws IOException { - return compact(throughputController, null); + return compact(throughputController, null, null); } @Override - public List compact(CompactionThroughputController throughputController, User user) - throws IOException { - return compactor.compact(request, throughputController, user); + public List compact(CompactionThroughputController throughputController, User user, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { + return compactor.compact(request, throughputController, user, regionLock); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e987bc6..6e67e83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1689,9 +1689,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /* - * Do preparation for pending compaction. - * @throws IOException + * Do preparation for pending compaction. Hook exposed for test. */ + @VisibleForTesting protected void doRegionCompactionPrep() throws IOException { } @@ -1784,17 +1784,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } MonitoredTask status = null; boolean requestNeedsCancellation = true; - // block waiting for the lock for compaction - lock.readLock().lock(); - try { - byte[] cf = Bytes.toBytes(store.getColumnFamilyName()); - if (stores.get(cf) != store) { - LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this - + " has been re-instantiated, cancel this compaction request. " - + " It may be caused by the roll back of split transaction"); - return false; - } + byte[] cf = Bytes.toBytes(store.getColumnFamilyName()); + if (stores.get(cf) != store) { + LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this + + " has been re-instantiated, cancel this compaction request. " + + " It may be caused by the roll back of split transaction"); + return false; + } + + try { status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this); if (this.closed.get()) { String msg = "Skipping compaction on " + this + " because closed"; @@ -1843,12 +1842,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.markComplete("Compaction complete"); return true; } finally { - try { - if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction); - if (status != null) status.cleanup(); - } finally { - lock.readLock().unlock(); - } + if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction); + if (status != null) status.cleanup(); } } @@ -1955,8 +1950,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Should the store be flushed because it is old enough. *

* Every FlushPolicy should call this to determine whether a store is old enough to flush(except - * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always - * returns true which will make a lot of flush requests. + * that you always flush all stores). Otherwise the {@link #shouldFlush(StringBuffer)} method + * will always return true which will make a lot of flush requests. */ boolean shouldFlushStore(Store store) { long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 1d996b2..b9bd82d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1194,7 +1194,7 @@ public class HStore implements Store { * Since we already have this data, this will be idempotent but we will have a redundant * copy of the data. * - If RS fails between 2 and 3, the region will have a redundant copy of the data. The - * RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL. + * RS that failed won't be able to finish sync() for WAL because of lease recovery in WAL. * - If RS fails after 3, the region region server who opens the region will pick up the * the compaction marker from the WAL and replay it by removing the compaction input files. * Failed RS can also attempt to delete those files, but the operation will be idempotent @@ -1238,7 +1238,7 @@ public class HStore implements Store { + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); // Commence the compaction. - List newFiles = compaction.compact(throughputController, user); + List newFiles = compaction.compact(throughputController, user, region.lock.readLock()); // TODO: get rid of this! if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { @@ -1252,19 +1252,24 @@ public class HStore implements Store { } return sfs; } - // Do the steps necessary to complete the compaction. - sfs = moveCompatedFilesIntoPlace(cr, newFiles, user); - writeCompactionWalRecord(filesToCompact, sfs); - replaceStoreFiles(filesToCompact, sfs); - if (cr.isMajor()) { - majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs; - majorCompactedCellsSize += getCompactionProgress().totalCompactedSize; - } else { - compactedCellsCount += getCompactionProgress().totalCompactingKVs; - compactedCellsSize += getCompactionProgress().totalCompactedSize; + // Do the steps necessary to complete the compaction. Hold region open for these operations. + region.lock.readLock().lock(); + try { + sfs = moveCompatedFilesIntoPlace(cr, newFiles, user); + writeCompactionWalRecord(filesToCompact, sfs); + replaceStoreFiles(filesToCompact, sfs); + if (cr.isMajor()) { + majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs; + majorCompactedCellsSize += getCompactionProgress().totalCompactedSize; + } else { + compactedCellsCount += getCompactionProgress().totalCompactingKVs; + compactedCellsSize += getCompactionProgress().totalCompactedSize; + } + // At this point the store will use new files for all new scanners. + completeCompaction(filesToCompact, true); // Archive old files & update store size. + } finally { + region.lock.readLock().unlock(); } - // At this point the store will use new files for all new scanners. - completeCompaction(filesToCompact, true); // Archive old files & update store size. logCompactionEndMessage(cr, sfs, compactionStartTime); return sfs; @@ -1463,6 +1468,7 @@ public class HStore implements Store { * but instead makes a compaction candidate list by itself. * @param N Number of files. */ + @VisibleForTesting public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException { List filesToCompact; boolean isMajor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java index 3707290..1566d60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -103,14 +104,14 @@ public class StripeStoreEngine extends StoreEngine compact(CompactionThroughputController throughputController) throws IOException { Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); - return this.stripeRequest.execute(compactor, throughputController, null); + return this.stripeRequest.execute(compactor, throughputController, null, null); } @Override - public List compact(CompactionThroughputController throughputController, User user) - throws IOException { + public List compact(CompactionThroughputController throughputController, User user, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); - return this.stripeRequest.execute(compactor, throughputController, user); + return this.stripeRequest.execute(compactor, throughputController, user, regionLock); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java index cb16966..121a405 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; @@ -67,13 +68,14 @@ public abstract class CompactionContext { /** * Runs the compaction based on current selection. select/forceSelect must have been called. + * @param throughputController Allows the compaction to be throttled. * @return The new file paths resulting from compaction. */ public abstract List compact(CompactionThroughputController throughputController) throws IOException; - public abstract List compact(CompactionThroughputController throughputController, User user) - throws IOException; + public abstract List compact(CompactionThroughputController throughputController, User user, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException; public CompactionRequest getRequest() { assert hasSelection(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 069d221..5f97b2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -23,7 +23,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -38,7 +40,8 @@ import org.apache.hadoop.hbase.security.User; /** * Compact passed set of files. Create an instance and then call - * {@link #compact(CompactionRequest, CompactionThroughputController, User)} + * {@link #compact(CompactionRequest, CompactionThroughputController, User, + * ReentrantReadWriteLock.ReadLock)} */ @InterfaceAudience.Private public class DefaultCompactor extends Compactor { @@ -52,7 +55,8 @@ public class DefaultCompactor extends Compactor { * Do a minor/major compaction on an explicit set of storefiles from a Store. */ public List compact(final CompactionRequest request, - CompactionThroughputController throughputController, User user) throws IOException { + CompactionThroughputController throughputController, User user, + ReentrantReadWriteLock.ReadLock readLock) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); this.progress = new CompactionProgress(fd.maxKeyCount); @@ -61,19 +65,24 @@ public class DefaultCompactor extends Compactor { List scanners; Collection readersToClose; - if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { - // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, - // HFileFiles, and their readers - readersToClose = new ArrayList(request.getFiles().size()); - for (StoreFile f : request.getFiles()) { - readersToClose.add(new StoreFile(f)); + readLock.lock(); // hold region open while we create scanners over storefiles. + try { + if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { + // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, + // HFileFiles, and their readers + readersToClose = new ArrayList(request.getFiles().size()); + for (StoreFile f : request.getFiles()) { + readersToClose.add(new StoreFile(f)); + } + scanners = createFileScanners(readersToClose, smallestReadPoint, + store.throttleCompaction(request.getSize())); + } else { + readersToClose = Collections.emptyList(); + scanners = createFileScanners(request.getFiles(), smallestReadPoint, + store.throttleCompaction(request.getSize())); } - scanners = createFileScanners(readersToClose, smallestReadPoint, - store.throttleCompaction(request.getSize())); - } else { - readersToClose = Collections.emptyList(); - scanners = createFileScanners(request.getFiles(), smallestReadPoint, - store.throttleCompaction(request.getSize())); + } finally { + readLock.unlock(); } StoreFile.Writer writer = null; @@ -173,7 +182,8 @@ public class DefaultCompactor extends Compactor { /** * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to - * {@link #compact(CompactionRequest, CompactionThroughputController, User)}; + * {@link #compact(CompactionRequest, CompactionThroughputController, User, + * ReentrantReadWriteLock.ReadLock)}; * @param filesToCompact the files to compact. These are used as the compactionSelection for * the generated {@link CompactionRequest}. * @param isMajor true to major compact (prune all deletes, max versions, etc) @@ -181,10 +191,12 @@ public class DefaultCompactor extends Compactor { * made it through the compaction. * @throws IOException */ + @VisibleForTesting public List compactForTesting(final Collection filesToCompact, boolean isMajor) throws IOException { CompactionRequest cr = new CompactionRequest(filesToCompact); cr.setIsMajor(isMajor, isMajor); - return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, null); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, null, lock.readLock()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index 5f024b8..35f0b67 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -393,7 +394,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { public List execute(StripeCompactor compactor, CompactionThroughputController throughputController) throws IOException { - return execute(compactor, throughputController, null); + return execute(compactor, throughputController, null, null); } /** * Executes the request against compactor (essentially, just calls correct overload of @@ -402,7 +403,8 @@ public class StripeCompactionPolicy extends CompactionPolicy { * @return result of compact(...) */ public abstract List execute(StripeCompactor compactor, - CompactionThroughputController throughputController, User user) throws IOException; + CompactionThroughputController throughputController, User user, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException; public StripeCompactionRequest(CompactionRequest request) { this.request = request; @@ -454,9 +456,10 @@ public class StripeCompactionPolicy extends CompactionPolicy { @Override public List execute(StripeCompactor compactor, - CompactionThroughputController throughputController, User user) throws IOException { + CompactionThroughputController throughputController, User user, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow, - this.majorRangeToRow, throughputController, user); + this.majorRangeToRow, throughputController, user, regionLock); } } @@ -505,9 +508,11 @@ public class StripeCompactionPolicy extends CompactionPolicy { @Override public List execute(StripeCompactor compactor, - CompactionThroughputController throughputController, User user) throws IOException { + CompactionThroughputController throughputController, User user, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow, - this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user); + this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user, + regionLock); } /** Set major range of the compaction to the entire compaction range. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 021965c..f7239b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -22,6 +22,7 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,14 +54,16 @@ public class StripeCompactor extends Compactor { public List compact(CompactionRequest request, List targetBoundaries, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow, - throughputController, null); + throughputController, null, regionLock); } public List compact(CompactionRequest request, List targetBoundaries, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController, User user) throws IOException { + CompactionThroughputController throughputController, User user, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { if (LOG.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:"); @@ -72,19 +75,21 @@ public class StripeCompactor extends Compactor { StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter( targetBoundaries, majorRangeFromRow, majorRangeToRow); return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow, - throughputController, user); + throughputController, user, regionLock); } public List compact(CompactionRequest request, int targetCount, long targetSize, byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { return compact(request, targetCount, targetSize, left, right, majorRangeFromRow, - majorRangeToRow, throughputController, null); + majorRangeToRow, throughputController, null, regionLock); } public List compact(CompactionRequest request, int targetCount, long targetSize, byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController, User user) throws IOException { + CompactionThroughputController throughputController, User user, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Executing compaction with " + targetSize + " target file size, no more than " + targetCount + " files, in [" @@ -93,19 +98,27 @@ public class StripeCompactor extends Compactor { StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter( targetCount, targetSize, left, right); return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow, - throughputController, user); + throughputController, user, regionLock); } private List compactInternal(StripeMultiFileWriter mw, final CompactionRequest request, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController, User user) throws IOException { + CompactionThroughputController throughputController, User user, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { final Collection filesToCompact = request.getFiles(); final FileDetails fd = getFileDetails(filesToCompact, request.isMajor()); this.progress = new CompactionProgress(fd.maxKeyCount); long smallestReadPoint = getSmallestReadPoint(); - List scanners = createFileScanners(filesToCompact, + List scanners = null; + regionLock.lock(); + try { + // TODO: does not respect hbase.regionserver.compaction.private.readers. + scanners = createFileScanners(filesToCompact, smallestReadPoint, store.throttleCompaction(request.getSize())); + } finally { + regionLock.unlock(); + } boolean finished = false; InternalScanner scanner = null; @@ -178,7 +191,13 @@ public class StripeCompactor extends Compactor { } assert finished : "We should have exited the method on all error paths"; - List newFiles = mw.commitWriters(fd.maxSeqId, request.isMajor()); + List newFiles = null; + regionLock.lock(); + try { + newFiles = mw.commitWriters(fd.maxSeqId, request.isMajor()); + } finally { + regionLock.unlock(); + } assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata."; return newFiles; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index b374bdc..74e63a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -36,6 +36,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -365,12 +366,12 @@ public class TestCompaction { @Override public List compact(CompactionThroughputController throughputController) throws IOException { - return compact(throughputController, null); + return compact(throughputController, null, null); } @Override - public List compact(CompactionThroughputController throughputController, User user) - throws IOException { + public List compact(CompactionThroughputController throughputController, User user, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { finishCompaction(this.selectedFiles); return new ArrayList(); } @@ -423,12 +424,12 @@ public class TestCompaction { @Override public List compact(CompactionThroughputController throughputController) throws IOException { - return compact(throughputController, null); + return compact(throughputController, null, null); } @Override - public List compact(CompactionThroughputController throughputController, User user) - throws IOException { + public List compact(CompactionThroughputController throughputController, User user, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { try { isInCompact = true; synchronized (this) { @@ -512,8 +513,10 @@ public class TestCompaction { r.compact(any(CompactionContext.class), any(Store.class), any(CompactionThroughputController.class), any(User.class))).then(new Answer() { public Boolean answer(InvocationOnMock invocation) throws Throwable { + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); invocation.getArgumentAt(0, CompactionContext.class).compact( - invocation.getArgumentAt(2, CompactionThroughputController.class)); + invocation.getArgumentAt(2, CompactionThroughputController.class), + null, dummyLock.readLock()); return true; } }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java index 47b6b5c..c7545a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.security.EncryptionUtil; @@ -144,7 +143,7 @@ public class TestHMobStore { private void init(String methodName, Configuration conf, HTableDescriptor htd, HColumnDescriptor hcd, boolean testStore) throws IOException { - //Setting up tje Region and Store + // Setting up the Region and Store Path basedir = new Path(DIR+methodName); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); String logName = "logs"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java index 22e91f0..65c89e7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java @@ -17,7 +17,16 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.collect.Lists; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -42,6 +51,9 @@ import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -60,20 +72,12 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import com.google.common.collect.Lists; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - /** * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of * the region server's bullkLoad functionality. @@ -95,6 +99,11 @@ public class TestHRegionServerBulkLoad { } } + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf.setInt("hbase.rpc.timeout", 10 * 1000); + } + /** * Create a rowkey compatible with * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}. @@ -211,6 +220,22 @@ public class TestHRegionServerBulkLoad { } } + public static class MyObserver extends BaseRegionObserver { + @Override + public InternalScanner preCompact(ObserverContext e, + final Store store, final InternalScanner scanner, final ScanType scanType) + throws IOException { + try { + Thread.sleep(30000); + } catch (InterruptedException ie) { + IOException ioe = new InterruptedIOException(); + ioe.initCause(ie); + throw ioe; + } + return scanner; + } + } + /** * Thread that does full scans of the table looking for any partially * completed rows. @@ -278,6 +303,7 @@ public class TestHRegionServerBulkLoad { try { LOG.info("Creating table " + table); HTableDescriptor htd = new HTableDescriptor(table); + htd.addCoprocessor(MyObserver.class.getName()); for (int i = 0; i < 10; i++) { htd.addFamily(new HColumnDescriptor(family(i))); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java index eb8513b..f5b01bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -129,11 +130,12 @@ public class TestStripeCompactor { public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries, KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles) throws Exception { + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); StoreFileWritersCapture writers = new StoreFileWritersCapture(); StripeCompactor sc = createCompactor(writers, input); List paths = sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo, - NoLimitCompactionThroughputController.INSTANCE); + NoLimitCompactionThroughputController.INSTANCE, dummyLock.readLock()); writers.verifyKvs(output, allFiles, true); if (allFiles) { assertEquals(output.length, paths.size()); @@ -166,11 +168,12 @@ public class TestStripeCompactor { public static void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize, byte[] left, byte[] right, KeyValue[][] output) throws Exception { + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); StoreFileWritersCapture writers = new StoreFileWritersCapture(); StripeCompactor sc = createCompactor(writers, input); List paths = sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null, - NoLimitCompactionThroughputController.INSTANCE); + NoLimitCompactionThroughputController.INSTANCE, dummyLock.readLock()); assertEquals(output.length, paths.size()); writers.verifyKvs(output, true, true); List boundaries = new ArrayList(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java index 1454aa8..3944a0d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -77,7 +78,8 @@ public class TestStripeStoreEngine { when( mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class), - any(CompactionThroughputController.class), any(User.class))) + any(CompactionThroughputController.class), any(User.class), + any(ReentrantReadWriteLock.ReadLock.class))) .thenReturn(new ArrayList()); // Produce 3 L0 files. @@ -96,10 +98,11 @@ public class TestStripeStoreEngine { assertEquals(2, compaction.getRequest().getFiles().size()); assertFalse(compaction.getRequest().getFiles().contains(sf)); // Make sure the correct method it called on compactor. - compaction.compact(NoLimitCompactionThroughputController.INSTANCE); + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); + compaction.compact(NoLimitCompactionThroughputController.INSTANCE, null, dummyLock.readLock()); verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L, StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null, - NoLimitCompactionThroughputController.INSTANCE, null); + NoLimitCompactionThroughputController.INSTANCE, null, dummyLock.readLock()); } private static StoreFile createFile() throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 56e71e8..a2c2a1a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -42,6 +42,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -216,10 +217,11 @@ public class TestStripeCompactionPolicy { assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); assertEquals(si.getStorefiles(), scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null, dummyLock.readLock()); verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), - any(NoLimitCompactionThroughputController.class), any(User.class)); + any(NoLimitCompactionThroughputController.class), any(User.class), eq(dummyLock.readLock())); } @Test @@ -387,11 +389,11 @@ public class TestStripeCompactionPolicy { // Merge all three expired stripes into one. StripeCompactionPolicy.StripeInformationProvider si = createStripesWithFiles(expired, expired, expired); - verifyMergeCompatcion(policy, si, 0, 2); + verifyMergeCompaction(policy, si, 0, 2); // Merge two adjacent expired stripes into one. si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired); - verifyMergeCompatcion(policy, si, 3, 4); + verifyMergeCompaction(policy, si, 3, 4); } finally { EnvironmentEdgeManager.reset(); } @@ -460,7 +462,7 @@ public class TestStripeCompactionPolicy { return new ArrayList(Arrays.asList(sfs)); } - private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si, + private void verifyMergeCompaction(StripeCompactionPolicy policy, StripeInformationProvider si, int from, int to) throws Exception { StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); Collection sfs = getAllFiles(si, from, to); @@ -469,7 +471,9 @@ public class TestStripeCompactionPolicy { // All the Stripes are expired, so the Compactor will not create any Writers. We need to create // an empty file to preserve metadata StripeCompactor sc = createCompactor(); - List paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); + List paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null, + dummyLock.readLock()); assertEquals(1, paths.size()); } @@ -528,7 +532,8 @@ public class TestStripeCompactionPolicy { assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null, dummyLock.readLock()); verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher>() { @Override public boolean matches(Object argument) { @@ -542,7 +547,7 @@ public class TestStripeCompactionPolicy { } }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), - any(NoLimitCompactionThroughputController.class), any(User.class)); + any(NoLimitCompactionThroughputController.class), any(User.class), eq(dummyLock.readLock())); } /** @@ -563,12 +568,13 @@ public class TestStripeCompactionPolicy { assertTrue(!needsCompaction || policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null, dummyLock.readLock()); verify(sc, times(1)).compact(eq(scr.getRequest()), count == null ? anyInt() : eq(count.intValue()), size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), - any(NoLimitCompactionThroughputController.class), any(User.class)); + any(NoLimitCompactionThroughputController.class), any(User.class), eq(dummyLock.readLock())); } /** Verify arbitrary flush. */