diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index 51e1a2d..22b82c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -119,9 +120,10 @@ public class DefaultStoreEngine extends StoreEngine< } @Override - public List compact(CompactionThroughputController throughputController) + public List compact(CompactionThroughputController throughputController, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { - return compactor.compact(request, throughputController); + return compactor.compact(request, throughputController, regionLock); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b6cdd29..cf97c24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1687,9 +1687,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /* - * Do preparation for pending compaction. - * @throws IOException + * Do preparation for pending compaction. Hook exposed for test. */ + @VisibleForTesting protected void doRegionCompactionPrep() throws IOException { } @@ -1776,17 +1776,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } MonitoredTask status = null; boolean requestNeedsCancellation = true; - // block waiting for the lock for compaction - lock.readLock().lock(); - try { - byte[] cf = Bytes.toBytes(store.getColumnFamilyName()); - if (stores.get(cf) != store) { - LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this - + " has been re-instantiated, cancel this compaction request. " - + " It may be caused by the roll back of split transaction"); - return false; - } + byte[] cf = Bytes.toBytes(store.getColumnFamilyName()); + if (stores.get(cf) != store) { + LOG.warn("Store " + store.getColumnFamilyName() + " on region " + this + + " has been re-instantiated, cancel this compaction request. " + + " It may be caused by the roll back of split transaction"); + return false; + } + + try { status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this); if (this.closed.get()) { String msg = "Skipping compaction on " + this + " because closed"; @@ -1835,12 +1834,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.markComplete("Compaction complete"); return true; } finally { - try { - if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction); - if (status != null) status.cleanup(); - } finally { - lock.readLock().unlock(); - } + if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction); + if (status != null) status.cleanup(); } } @@ -1947,8 +1942,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * Should the store be flushed because it is old enough. *

* Every FlushPolicy should call this to determine whether a store is old enough to flush(except - * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always - * returns true which will make a lot of flush requests. + * that you always flush all stores). Otherwise the {@link #shouldFlush(StringBuffer)} method + * will always return true which will make a lot of flush requests. */ boolean shouldFlushStore(Store store) { long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index cfda1c6..2336588 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1192,7 +1192,7 @@ public class HStore implements Store { * Since we already have this data, this will be idempotent but we will have a redundant * copy of the data. * - If RS fails between 2 and 3, the region will have a redundant copy of the data. The - * RS that failed won't be able to finish snyc() for WAL because of lease recovery in WAL. + * RS that failed won't be able to finish sync() for WAL because of lease recovery in WAL. * - If RS fails after 3, the region region server who opens the region will pick up the * the compaction marker from the WAL and replay it by removing the compaction input files. * Failed RS can also attempt to delete those files, but the operation will be idempotent @@ -1230,7 +1230,7 @@ public class HStore implements Store { + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); // Commence the compaction. - List newFiles = compaction.compact(throughputController); + List newFiles = compaction.compact(throughputController, region.lock.readLock()); // TODO: get rid of this! if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { @@ -1244,19 +1244,24 @@ public class HStore implements Store { } return sfs; } - // Do the steps necessary to complete the compaction. - sfs = moveCompatedFilesIntoPlace(cr, newFiles); - writeCompactionWalRecord(filesToCompact, sfs); - replaceStoreFiles(filesToCompact, sfs); - if (cr.isMajor()) { - majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs; - majorCompactedCellsSize += getCompactionProgress().totalCompactedSize; - } else { - compactedCellsCount += getCompactionProgress().totalCompactingKVs; - compactedCellsSize += getCompactionProgress().totalCompactedSize; + // Do the steps necessary to complete the compaction. Hold region open for these operations. + region.lock.readLock().lock(); + try { + sfs = moveCompatedFilesIntoPlace(cr, newFiles); + writeCompactionWalRecord(filesToCompact, sfs); + replaceStoreFiles(filesToCompact, sfs); + if (cr.isMajor()) { + majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs; + majorCompactedCellsSize += getCompactionProgress().totalCompactedSize; + } else { + compactedCellsCount += getCompactionProgress().totalCompactingKVs; + compactedCellsSize += getCompactionProgress().totalCompactedSize; + } + // At this point the store will use new files for all new scanners. + completeCompaction(filesToCompact, true); // Archive old files & update store size. + } finally { + region.lock.readLock().unlock(); } - // At this point the store will use new files for all new scanners. - completeCompaction(filesToCompact, true); // Archive old files & update store size. logCompactionEndMessage(cr, sfs, compactionStartTime); return sfs; @@ -1438,6 +1443,7 @@ public class HStore implements Store { * but instead makes a compaction candidate list by itself. * @param N Number of files. */ + @VisibleForTesting public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException { List filesToCompact; boolean isMajor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java index 26339a3..565fbb3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -99,10 +100,10 @@ public class StripeStoreEngine extends StoreEngine compact(CompactionThroughputController throughputController) - throws IOException { + public List compact(CompactionThroughputController throughputController, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); - return this.stripeRequest.execute(compactor, throughputController); + return this.stripeRequest.execute(compactor, throughputController, regionLock); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java index 1c89bf0..0b95659 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; @@ -66,10 +67,12 @@ public abstract class CompactionContext { /** * Runs the compaction based on current selection. select/forceSelect must have been called. + * @param throughputController Allows the compaction to be throttled. + * @param regionLock used to hold the region open during critical sections. * @return The new file paths resulting from compaction. */ - public abstract List compact(CompactionThroughputController throughputController) - throws IOException; + public abstract List compact(CompactionThroughputController throughputController, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException; public CompactionRequest getRequest() { assert hasSelection(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index f26f4fe..be9babd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -23,7 +23,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -37,7 +39,8 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner; /** * Compact passed set of files. Create an instance and then call - * {@link #compact(CompactionRequest, CompactionThroughputController)} + * {@link #compact(CompactionRequest, CompactionThroughputController, + * ReentrantReadWriteLock.ReadLock)} */ @InterfaceAudience.Private public class DefaultCompactor extends Compactor { @@ -51,7 +54,8 @@ public class DefaultCompactor extends Compactor { * Do a minor/major compaction on an explicit set of storefiles from a Store. */ public List compact(final CompactionRequest request, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, + ReentrantReadWriteLock.ReadLock readLock) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); this.progress = new CompactionProgress(fd.maxKeyCount); @@ -60,19 +64,24 @@ public class DefaultCompactor extends Compactor { List scanners; Collection readersToClose; - if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { - // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, - // HFileFiles, and their readers - readersToClose = new ArrayList(request.getFiles().size()); - for (StoreFile f : request.getFiles()) { - readersToClose.add(new StoreFile(f)); + readLock.lock(); // hold region open while we create scanners over storefiles. + try { + if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { + // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, + // HFileFiles, and their readers + readersToClose = new ArrayList(request.getFiles().size()); + for (StoreFile f : request.getFiles()) { + readersToClose.add(new StoreFile(f)); + } + scanners = createFileScanners(readersToClose, smallestReadPoint, + store.throttleCompaction(request.getSize())); + } else { + readersToClose = Collections.emptyList(); + scanners = createFileScanners(request.getFiles(), smallestReadPoint, + store.throttleCompaction(request.getSize())); } - scanners = createFileScanners(readersToClose, smallestReadPoint, - store.throttleCompaction(request.getSize())); - } else { - readersToClose = Collections.emptyList(); - scanners = createFileScanners(request.getFiles(), smallestReadPoint, - store.throttleCompaction(request.getSize())); + } finally { + readLock.unlock(); } StoreFile.Writer writer = null; @@ -172,7 +181,8 @@ public class DefaultCompactor extends Compactor { /** * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to - * {@link #compact(CompactionRequest, CompactionThroughputController)}; + * {@link #compact(CompactionRequest, CompactionThroughputController, + * ReentrantReadWriteLock.ReadLock)}; * @param filesToCompact the files to compact. These are used as the compactionSelection for * the generated {@link CompactionRequest}. * @param isMajor true to major compact (prune all deletes, max versions, etc) @@ -180,10 +190,12 @@ public class DefaultCompactor extends Compactor { * made it through the compaction. * @throws IOException */ + @VisibleForTesting public List compactForTesting(final Collection filesToCompact, boolean isMajor) throws IOException { CompactionRequest cr = new CompactionRequest(filesToCompact); cr.setIsMajor(isMajor, isMajor); - return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, lock.readLock()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index 0d49f09..fa14f30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -397,7 +398,8 @@ public class StripeCompactionPolicy extends CompactionPolicy { * @return result of compact(...) */ public abstract List execute(StripeCompactor compactor, - CompactionThroughputController throughputController) throws IOException; + CompactionThroughputController throughputController, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException; public StripeCompactionRequest(CompactionRequest request) { this.request = request; @@ -449,9 +451,10 @@ public class StripeCompactionPolicy extends CompactionPolicy { @Override public List execute(StripeCompactor compactor, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow, - this.majorRangeToRow, throughputController); + this.majorRangeToRow, throughputController, regionLock); } } @@ -500,9 +503,11 @@ public class StripeCompactionPolicy extends CompactionPolicy { @Override public List execute(StripeCompactor compactor, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow, - this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController); + this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, + regionLock); } /** Set major range of the compaction to the entire compaction range. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 6814b8c..d888509 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -22,6 +22,7 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,7 +53,8 @@ public class StripeCompactor extends Compactor { public List compact(CompactionRequest request, List targetBoundaries, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { if (LOG.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:"); @@ -64,12 +66,13 @@ public class StripeCompactor extends Compactor { StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter( targetBoundaries, majorRangeFromRow, majorRangeToRow); return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow, - throughputController); + throughputController, regionLock); } public List compact(CompactionRequest request, int targetCount, long targetSize, byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Executing compaction with " + targetSize + " target file size, no more than " + targetCount + " files, in [" @@ -78,19 +81,27 @@ public class StripeCompactor extends Compactor { StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter( targetCount, targetSize, left, right); return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow, - throughputController); + throughputController, regionLock); } private List compactInternal(StripeMultiFileWriter mw, final CompactionRequest request, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { final Collection filesToCompact = request.getFiles(); final FileDetails fd = getFileDetails(filesToCompact, request.isMajor()); this.progress = new CompactionProgress(fd.maxKeyCount); long smallestReadPoint = getSmallestReadPoint(); - List scanners = createFileScanners(filesToCompact, - smallestReadPoint, store.throttleCompaction(request.getSize())); + List scanners = null; + regionLock.lock(); + try { + // TODO: does not respect hbase.regionserver.compaction.private.readers. + scanners = createFileScanners(filesToCompact, + smallestReadPoint, store.throttleCompaction(request.getSize())); + } finally { + regionLock.unlock(); + } boolean finished = false; InternalScanner scanner = null; @@ -163,7 +174,13 @@ public class StripeCompactor extends Compactor { } assert finished : "We should have exited the method on all error paths"; - List newFiles = mw.commitWriters(fd.maxSeqId, request.isMajor()); + List newFiles = null; + regionLock.lock(); + try { + newFiles = mw.commitWriters(fd.maxSeqId, request.isMajor()); + } finally { + regionLock.unlock(); + } assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata."; return newFiles; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index a377325..a2fefe6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -36,6 +36,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -365,8 +366,8 @@ public class TestCompaction { } @Override - public List compact(CompactionThroughputController throughputController) - throws IOException { + public List compact(CompactionThroughputController throughputController, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { finishCompaction(this.selectedFiles); return new ArrayList(); } @@ -417,8 +418,8 @@ public class TestCompaction { } @Override - public List compact(CompactionThroughputController throughputController) - throws IOException { + public List compact(CompactionThroughputController throughputController, + ReentrantReadWriteLock.ReadLock regionLock) throws IOException { try { isInCompact = true; synchronized (this) { @@ -502,8 +503,9 @@ public class TestCompaction { r.compact(any(CompactionContext.class), any(Store.class), any(CompactionThroughputController.class))).then(new Answer() { public Boolean answer(InvocationOnMock invocation) throws Throwable { + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); invocation.getArgumentAt(0, CompactionContext.class).compact( - invocation.getArgumentAt(2, CompactionThroughputController.class)); + invocation.getArgumentAt(2, CompactionThroughputController.class), dummyLock.readLock()); return true; } }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java index 2cb3b38..c7545a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -36,7 +36,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.HarFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -58,9 +57,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; @@ -146,7 +143,7 @@ public class TestHMobStore { private void init(String methodName, Configuration conf, HTableDescriptor htd, HColumnDescriptor hcd, boolean testStore) throws IOException { - //Setting up tje Region and Store + // Setting up the Region and Store Path basedir = new Path(DIR+methodName); Path tableDir = FSUtils.getTableDir(basedir, htd.getTableName()); String logName = "logs"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java index 6ef6336..88bfea5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -129,11 +130,12 @@ public class TestStripeCompactor { public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries, KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles) throws Exception { + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); StoreFileWritersCapture writers = new StoreFileWritersCapture(); StripeCompactor sc = createCompactor(writers, input); List paths = sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo, - NoLimitCompactionThroughputController.INSTANCE); + NoLimitCompactionThroughputController.INSTANCE, dummyLock.readLock()); writers.verifyKvs(output, allFiles, true); if (allFiles) { assertEquals(output.length, paths.size()); @@ -166,11 +168,12 @@ public class TestStripeCompactor { public static void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize, byte[] left, byte[] right, KeyValue[][] output) throws Exception { + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); StoreFileWritersCapture writers = new StoreFileWritersCapture(); StripeCompactor sc = createCompactor(writers, input); List paths = sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null, - NoLimitCompactionThroughputController.INSTANCE); + NoLimitCompactionThroughputController.INSTANCE, dummyLock.readLock()); assertEquals(output.length, paths.size()); writers.verifyKvs(output, true, true); List boundaries = new ArrayList(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java index d9e3ea3..3a2cec4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -76,7 +77,8 @@ public class TestStripeStoreEngine { when( mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class), - any(CompactionThroughputController.class))).thenReturn(new ArrayList()); + any(CompactionThroughputController.class), any(ReentrantReadWriteLock.ReadLock.class))) + .thenReturn(new ArrayList()); // Produce 3 L0 files. StoreFile sf = createFile(); @@ -94,10 +96,11 @@ public class TestStripeStoreEngine { assertEquals(2, compaction.getRequest().getFiles().size()); assertFalse(compaction.getRequest().getFiles().contains(sf)); // Make sure the correct method it called on compactor. - compaction.compact(NoLimitCompactionThroughputController.INSTANCE); + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); + compaction.compact(NoLimitCompactionThroughputController.INSTANCE, dummyLock.readLock()); verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L, StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null, - NoLimitCompactionThroughputController.INSTANCE); + NoLimitCompactionThroughputController.INSTANCE, dummyLock.readLock()); } private static StoreFile createFile() throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index a0579ce..e2fe3b0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -42,6 +42,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -215,10 +216,11 @@ public class TestStripeCompactionPolicy { assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); assertEquals(si.getStorefiles(), scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, dummyLock.readLock()); verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), - any(NoLimitCompactionThroughputController.class)); + any(NoLimitCompactionThroughputController.class), eq(dummyLock.readLock())); } @Test @@ -386,11 +388,11 @@ public class TestStripeCompactionPolicy { // Merge all three expired stripes into one. StripeCompactionPolicy.StripeInformationProvider si = createStripesWithFiles(expired, expired, expired); - verifyMergeCompatcion(policy, si, 0, 2); + verifyMergeCompaction(policy, si, 0, 2); // Merge two adjacent expired stripes into one. si = createStripesWithFiles(notExpired, expired, notExpired, expired, expired, notExpired); - verifyMergeCompatcion(policy, si, 3, 4); + verifyMergeCompaction(policy, si, 3, 4); } finally { EnvironmentEdgeManager.reset(); } @@ -459,7 +461,7 @@ public class TestStripeCompactionPolicy { return new ArrayList(Arrays.asList(sfs)); } - private void verifyMergeCompatcion(StripeCompactionPolicy policy, StripeInformationProvider si, + private void verifyMergeCompaction(StripeCompactionPolicy policy, StripeInformationProvider si, int from, int to) throws Exception { StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); Collection sfs = getAllFiles(si, from, to); @@ -468,7 +470,9 @@ public class TestStripeCompactionPolicy { // All the Stripes are expired, so the Compactor will not create any Writers. We need to create // an empty file to preserve metadata StripeCompactor sc = createCompactor(); - List paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); + List paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, + dummyLock.readLock()); assertEquals(1, paths.size()); } @@ -527,7 +531,8 @@ public class TestStripeCompactionPolicy { assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, dummyLock.readLock()); verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher>() { @Override public boolean matches(Object argument) { @@ -541,7 +546,7 @@ public class TestStripeCompactionPolicy { } }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), - any(NoLimitCompactionThroughputController.class)); + any(NoLimitCompactionThroughputController.class), eq(dummyLock.readLock())); } /** @@ -562,12 +567,13 @@ public class TestStripeCompactionPolicy { assertTrue(!needsCompaction || policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + ReentrantReadWriteLock dummyLock = new ReentrantReadWriteLock(); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, dummyLock.readLock()); verify(sc, times(1)).compact(eq(scr.getRequest()), count == null ? anyInt() : eq(count.intValue()), size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), - any(NoLimitCompactionThroughputController.class)); + any(NoLimitCompactionThroughputController.class), eq(dummyLock.readLock())); } /** Verify arbitrary flush. */