diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index a9e2fca..6ce90bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -352,7 +351,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi CompactionContext compaction = null; if (selectNow) { - compaction = selectCompaction(r, s, priority, request); + compaction = selectCompaction(r, s, priority, request, user); if (compaction == null) return null; // message logged inside } @@ -370,10 +369,10 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } private CompactionContext selectCompaction(final Region r, final Store s, - int priority, CompactionRequest request) throws IOException { - CompactionContext compaction = s.requestCompaction(priority, request); + int priority, CompactionRequest request, User user) throws IOException { + CompactionContext compaction = s.requestCompaction(priority, request, user); if (compaction == null) { - if(LOG.isDebugEnabled()) { + if(LOG.isDebugEnabled() && r.getRegionInfo() != null) { LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() + " because compaction request was cancelled"); } @@ -484,7 +483,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi : ("Store = " + store.toString() + ", pri = " + queuedPriority); } - private void doCompaction() { + private void doCompaction(User user) { // Common case - system compaction without a file selection. Select now. if (this.compaction == null) { int oldPriority = this.queuedPriority; @@ -496,7 +495,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi return; } try { - this.compaction = selectCompaction(this.region, this.store, queuedPriority, null); + this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user); } catch (IOException ex) { LOG.error("Compaction selection failed " + this, ex); server.checkFileSystem(); @@ -528,7 +527,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi // put it into region/store/etc. This is CST logic. long start = EnvironmentEdgeManager.currentTime(); boolean completed = - region.compact(compaction, store, compactionThroughputController); + region.compact(compaction, store, compactionThroughputController, user); long now = EnvironmentEdgeManager.currentTime(); LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + this + "; duration=" + StringUtils.formatTimeDiff(now, start)); @@ -565,22 +564,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { return; } - if (this.user == null) doCompaction(); - else { - try { - user.getUGI().doAs(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - doCompaction(); - return null; - } - }); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } catch (IOException ioe) { - LOG.error("Encountered exception while compacting", ioe); - } - } + doCompaction(user); } private String formatStackTrace(Exception ex) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java index a30b526..c39f310 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java @@ -73,15 +73,14 @@ public interface CompactionRequestor { * @param requests custom compaction requests. Each compaction must specify the store on which it * is acting. Can be null in which case a compaction will be attempted on all * stores for the region. - * @user the effective user + * @param user the effective user * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no * compactions were started. * @throws IOException */ List requestCompaction( final Region r, final String why, int pri, List> requests, - User user - ) throws IOException; + User user) throws IOException; /** * @param r Region to compact diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index 51e1a2d..da326e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolic import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ReflectionUtils; /** @@ -121,7 +122,13 @@ public class DefaultStoreEngine extends StoreEngine< @Override public List compact(CompactionThroughputController throughputController) throws IOException { - return compactor.compact(request, throughputController); + return compact(throughputController, null); + } + + @Override + public List compact(CompactionThroughputController throughputController, User user) + throws IOException { + return compactor.compact(request, throughputController, user); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b6cdd29..34738de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -156,6 +156,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.util.ByteStringer; @@ -1715,7 +1716,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (controller == null) { controller = NoLimitCompactionThroughputController.INSTANCE; } - compact(compaction, s, controller); + compact(compaction, s, controller, null); } } } @@ -1730,7 +1731,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi for (Store s : getStores()) { CompactionContext compaction = s.requestCompaction(); if (compaction != null) { - compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE); + compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null); } } } @@ -1747,7 +1748,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Store s = getStore(family); CompactionContext compaction = s.requestCompaction(); if (compaction != null) { - compact(compaction, s, throughputController); + compact(compaction, s, throughputController, null); } } @@ -1763,10 +1764,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * server does them sequentially and not in parallel. * * @param compaction Compaction details, obtained by requestCompaction() + * @param throughputController * @return whether the compaction completed */ public boolean compact(CompactionContext compaction, Store store, CompactionThroughputController throughputController) throws IOException { + return compact(compaction, store, throughputController, null); + } + + public boolean compact(CompactionContext compaction, Store store, + CompactionThroughputController throughputController, User user) throws IOException { assert compaction != null && compaction.hasSelection(); assert !compaction.getRequest().getFiles().isEmpty(); if (this.closing.get() || this.closed.get()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index cfda1c6..b60948a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -23,6 +23,7 @@ import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.security.Key; import java.security.KeyException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -1206,6 +1207,12 @@ public class HStore implements Store { @Override public List compact(CompactionContext compaction, CompactionThroughputController throughputController) throws IOException { + return compact(compaction, throughputController, null); + } + + @Override + public List compact(CompactionContext compaction, + CompactionThroughputController throughputController, User user) throws IOException { assert compaction != null; List sfs = null; CompactionRequest cr = compaction.getRequest(); @@ -1230,7 +1237,7 @@ public class HStore implements Store { + TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1)); // Commence the compaction. - List newFiles = compaction.compact(throughputController); + List newFiles = compaction.compact(throughputController, user); // TODO: get rid of this! if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { @@ -1245,7 +1252,7 @@ public class HStore implements Store { return sfs; } // Do the steps necessary to complete the compaction. - sfs = moveCompatedFilesIntoPlace(cr, newFiles); + sfs = moveCompatedFilesIntoPlace(cr, newFiles, user); writeCompactionWalRecord(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs); if (cr.isMajor()) { @@ -1266,13 +1273,30 @@ public class HStore implements Store { } private List moveCompatedFilesIntoPlace( - CompactionRequest cr, List newFiles) throws IOException { + final CompactionRequest cr, List newFiles, User user) throws IOException { List sfs = new ArrayList(newFiles.size()); for (Path newFile : newFiles) { assert newFile != null; - StoreFile sf = moveFileIntoPlace(newFile); + final StoreFile sf = moveFileIntoPlace(newFile); if (this.getCoprocessorHost() != null) { - this.getCoprocessorHost().postCompact(this, sf, cr); + final Store thisStore = this; + if (user == null) { + getCoprocessorHost().postCompact(thisStore, sf, cr); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + getCoprocessorHost().postCompact(thisStore, sf, cr); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } assert sf != null; sfs.add(sf); @@ -1519,6 +1543,11 @@ public class HStore implements Store { @Override public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest) throws IOException { + return requestCompaction(priority, baseRequest, null); + } + @Override + public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest, + User user) throws IOException { // don't even select for compaction if writes are disabled if (!this.areWritesEnabled()) { return null; @@ -1527,16 +1556,34 @@ public class HStore implements Store { // Before we do compaction, try to get rid of unneeded files to simplify things. removeUnneededFiles(); - CompactionContext compaction = storeEngine.createCompaction(); + final CompactionContext compaction = storeEngine.createCompaction(); CompactionRequest request = null; this.lock.readLock().lock(); try { synchronized (filesCompacting) { + final Store thisStore = this; // First, see if coprocessor would want to override selection. if (this.getCoprocessorHost() != null) { - List candidatesForCoproc = compaction.preSelect(this.filesCompacting); - boolean override = this.getCoprocessorHost().preCompactSelection( - this, candidatesForCoproc, baseRequest); + final List candidatesForCoproc = compaction.preSelect(this.filesCompacting); + boolean override = false; + if (user == null) { + override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, + baseRequest); + } else { + try { + override = user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc, + baseRequest); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } if (override) { // Coprocessor is overriding normal file selection. compaction.forceSelect(new CompactionRequest(candidatesForCoproc)); @@ -1564,8 +1611,25 @@ public class HStore implements Store { } } if (this.getCoprocessorHost() != null) { - this.getCoprocessorHost().postCompactSelection( + if (user == null) { + this.getCoprocessorHost().postCompactSelection( this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + getCoprocessorHost().postCompactSelection( + thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } // Selected files; see if we have a compaction with some custom base request. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 8d35a7d..87bfab7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Pair; /** @@ -193,14 +194,22 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf CompactionContext requestCompaction() throws IOException; + @Deprecated CompactionContext requestCompaction(int priority, CompactionRequest baseRequest) throws IOException; + CompactionContext requestCompaction(int priority, CompactionRequest baseRequest, User user) + throws IOException; + void cancelRequestedCompaction(CompactionContext compaction); + @Deprecated List compact(CompactionContext compaction, CompactionThroughputController throughputController) throws IOException; + List compact(CompactionContext compaction, + CompactionThroughputController throughputController, User user) throws IOException; + /** * @return true if we should run a major compaction. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java index 26339a3..9ec4657 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.security.User; import com.google.common.base.Preconditions; @@ -104,5 +105,12 @@ public class StripeStoreEngine extends StoreEngine compact(CompactionThroughputController throughputController, User user) + throws IOException { + Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); + return this.stripeRequest.execute(compactor, throughputController); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java index 1c89bf0..cb16966 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.security.User; /** @@ -71,6 +72,9 @@ public abstract class CompactionContext { public abstract List compact(CompactionThroughputController throughputController) throws IOException; + public abstract List compact(CompactionThroughputController throughputController, User user) + throws IOException; + public CompactionRequest getRequest() { assert hasSelection(); return this.request; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index eaccd0d..660ea91 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.io.InterruptedIOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -224,9 +226,31 @@ public abstract class Compactor { */ protected InternalScanner preCreateCoprocScanner(final CompactionRequest request, ScanType scanType, long earliestPutTs, List scanners) throws IOException { + return preCreateCoprocScanner(request, scanType, earliestPutTs, scanners, null); + } + + protected InternalScanner preCreateCoprocScanner(final CompactionRequest request, + final ScanType scanType, final long earliestPutTs, final List scanners, + User user) throws IOException { if (store.getCoprocessorHost() == null) return null; - return store.getCoprocessorHost() - .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request); + if (user == null) { + return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType, + earliestPutTs, request); + } else { + try { + return user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public InternalScanner run() throws Exception { + return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, + scanType, earliestPutTs, request); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } /** @@ -237,9 +261,24 @@ public abstract class Compactor { * @return Scanner scanner to use (usually the default); null if compaction should not proceed. */ protected InternalScanner postCreateCoprocScanner(final CompactionRequest request, - ScanType scanType, InternalScanner scanner) throws IOException { - if (store.getCoprocessorHost() == null) return scanner; - return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); + final ScanType scanType, final InternalScanner scanner, User user) throws IOException { + if (store.getCoprocessorHost() == null) return scanner; + if (user == null) { + return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); + } else { + try { + return user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public InternalScanner run() throws Exception { + return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index f26f4fe..d260c26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -34,10 +34,11 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.security.User; /** * Compact passed set of files. Create an instance and then call - * {@link #compact(CompactionRequest, CompactionThroughputController)} + * {@link #compact(CompactionRequest, CompactionThroughputController, User)} */ @InterfaceAudience.Private public class DefaultCompactor extends Compactor { @@ -51,7 +52,7 @@ public class DefaultCompactor extends Compactor { * Do a minor/major compaction on an explicit set of storefiles from a Store. */ public List compact(final CompactionRequest request, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, User user) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); this.progress = new CompactionProgress(fd.maxKeyCount); @@ -91,7 +92,7 @@ public class DefaultCompactor extends Compactor { if (scanner == null) { scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs); } - scanner = postCreateCoprocScanner(request, scanType, scanner); + scanner = postCreateCoprocScanner(request, scanType, scanner, user); if (scanner == null) { // NULL scanner returned from coprocessor hooks means skip normal processing. return newFiles; @@ -172,7 +173,7 @@ public class DefaultCompactor extends Compactor { /** * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to - * {@link #compact(CompactionRequest, CompactionThroughputController)}; + * {@link #compact(CompactionRequest, CompactionThroughputController, User)}; * @param filesToCompact the files to compact. These are used as the compactionSelection for * the generated {@link CompactionRequest}. * @param isMajor true to major compact (prune all deletes, max versions, etc) @@ -184,6 +185,6 @@ public class DefaultCompactor extends Compactor { throws IOException { CompactionRequest cr = new CompactionRequest(filesToCompact); cr.setIsMajor(isMajor, isMajor); - return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE); + return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, null); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 6814b8c..99ee5b0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; /** @@ -64,7 +65,7 @@ public class StripeCompactor extends Compactor { StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter( targetBoundaries, majorRangeFromRow, majorRangeToRow); return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow, - throughputController); + throughputController, null); } public List compact(CompactionRequest request, int targetCount, long targetSize, @@ -78,12 +79,12 @@ public class StripeCompactor extends Compactor { StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter( targetCount, targetSize, left, right); return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow, - throughputController); + throughputController, null); } private List compactInternal(StripeMultiFileWriter mw, final CompactionRequest request, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, User user) throws IOException { final Collection filesToCompact = request.getFiles(); final FileDetails fd = getFileDetails(filesToCompact, request.isMajor()); this.progress = new CompactionProgress(fd.maxKeyCount); @@ -98,7 +99,7 @@ public class StripeCompactor extends Compactor { try { // Get scanner to use. ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES; - scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners); + scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners, user); if (scanner == null) { scanner = (majorRangeFromRow == null) ? createScanner(store, scanners, @@ -106,7 +107,7 @@ public class StripeCompactor extends Compactor { : createScanner(store, scanners, smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow); } - scanner = postCreateCoprocScanner(request, coprocScanType, scanner); + scanner = postCreateCoprocScanner(request, coprocScanType, scanner, user); if (scanner == null) { // NULL scanner returned from coprocessor hooks means skip normal processing. return new ArrayList(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index bb216b6..c2a23d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; @@ -129,6 +130,16 @@ public class TestIOFencing { } } + @Override + public boolean compact(CompactionContext compaction, Store store, + CompactionThroughputController throughputController, User user) throws IOException { + try { + return super.compact(compaction, store, throughputController, user); + } finally { + compactCount++; + } + } + public int countStoreFiles() { int count = 0; for (Store store : stores.values()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 00808bd..44e06bd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -239,6 +240,14 @@ public class TestRegionObserverScannerOpenHook { if (ret) compactionStateChangeLatch.countDown(); return ret; } + + @Override + public boolean compact(CompactionContext compaction, Store store, + CompactionThroughputController throughputController, User user) throws IOException { + boolean ret = super.compact(compaction, store, throughputController, user); + if (ret) compactionStateChangeLatch.countDown(); + return ret; + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java index dd9c037..0526462 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.*; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.security.User; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -56,6 +57,8 @@ public class StatefulStoreMockMaker { Store store = mock(Store.class, name); when(store.requestCompaction( anyInt(), isNull(CompactionRequest.class))).then(new SelectAnswer()); + when(store.requestCompaction( + anyInt(), isNull(CompactionRequest.class), any(User.class))).then(new SelectAnswer()); when(store.getCompactPriority()).then(new PriorityAnswer()); doAnswer(new CancelAnswer()).when( store).cancelRequestedCompaction(any(CompactionContext.class)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index a377325..0460972 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.HBaseTestCase.HRegionIncommon; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.client.Delete; @@ -367,6 +368,12 @@ public class TestCompaction { @Override public List compact(CompactionThroughputController throughputController) throws IOException { + return compact(throughputController, null); + } + + @Override + public List compact(CompactionThroughputController throughputController, User user) + throws IOException { finishCompaction(this.selectedFiles); return new ArrayList(); } @@ -419,6 +426,12 @@ public class TestCompaction { @Override public List compact(CompactionThroughputController throughputController) throws IOException { + return compact(throughputController, null); + } + + @Override + public List compact(CompactionThroughputController throughputController, User user) + throws IOException { try { isInCompact = true; synchronized (this) { @@ -500,7 +513,7 @@ public class TestCompaction { HRegion r = mock(HRegion.class); when( r.compact(any(CompactionContext.class), any(Store.class), - any(CompactionThroughputController.class))).then(new Answer() { + any(CompactionThroughputController.class), any(User.class))).then(new Answer() { public Boolean answer(InvocationOnMock invocation) throws Throwable { invocation.getArgumentAt(0, CompactionContext.class).compact( invocation.getArgumentAt(2, CompactionThroughputController.class));