diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 2766b5b..3e73ec1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -330,7 +329,7 @@ public class CompactSplitThread implements CompactionRequestor { CompactionContext compaction = null; if (selectNow) { - compaction = selectCompaction(r, s, priority, request); + compaction = selectCompaction(r, s, priority, request, user); if (compaction == null) return null; // message logged inside } @@ -348,8 +347,8 @@ public class CompactSplitThread implements CompactionRequestor { } private CompactionContext selectCompaction(final HRegion r, final Store s, - int priority, CompactionRequest request) throws IOException { - CompactionContext compaction = s.requestCompaction(priority, request); + int priority, CompactionRequest request, User user) throws IOException { + CompactionContext compaction = s.requestCompaction(priority, request, user); if (compaction == null) { if(LOG.isDebugEnabled()) { LOG.debug("Not compacting " + r.getRegionNameAsString() + @@ -460,7 +459,7 @@ public class CompactSplitThread implements CompactionRequestor { : ("Store = " + store.toString() + ", pri = " + queuedPriority); } - private void doCompaction() { + private void doCompaction(User user) { // Common case - system compaction without a file selection. Select now. if (this.compaction == null) { int oldPriority = this.queuedPriority; @@ -472,7 +471,7 @@ public class CompactSplitThread implements CompactionRequestor { return; } try { - this.compaction = selectCompaction(this.region, this.store, queuedPriority, null); + this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user); } catch (IOException ex) { LOG.error("Compaction selection failed " + this, ex); server.checkFileSystem(); @@ -500,7 +499,7 @@ public class CompactSplitThread implements CompactionRequestor { // Note: please don't put single-compaction logic here; // put it into region/store/etc. This is CST logic. long start = EnvironmentEdgeManager.currentTimeMillis(); - boolean completed = region.compact(compaction, store, compactionThroughputController); + boolean completed = region.compact(compaction, store, compactionThroughputController, user); long now = EnvironmentEdgeManager.currentTimeMillis(); LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " + this + "; duration=" + StringUtils.formatTimeDiff(now, start)); @@ -536,22 +535,7 @@ public class CompactSplitThread implements CompactionRequestor { || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { return; } - if (this.user == null) doCompaction(); - else { - try { - user.getUGI().doAs(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - doCompaction(); - return null; - } - }); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } catch (IOException ioe) { - LOG.error("Encountered exception while compacting", ioe); - } - } + doCompaction(user); } private String formatStackTrace(Exception ex) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java index 1fa4b8e..b304ed9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java @@ -73,15 +73,14 @@ public interface CompactionRequestor { * @param requests custom compaction requests. Each compaction must specify the store on which it * is acting. Can be null in which case a compaction will be attempted on all * stores for the region. - * @user the effective user + * @param user the effective user * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no * compactions were started. * @throws IOException */ List requestCompaction( final HRegion r, final String why, int pri, List> requests, - User user - ) throws IOException; + User user) throws IOException; /** * @param r Region to compact diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java index 3c1345d..727152c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolic import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ReflectionUtils; /** @@ -109,7 +110,13 @@ public class DefaultStoreEngine extends StoreEngine< @Override public List compact(CompactionThroughputController throughputController) throws IOException { - return compactor.compact(request, throughputController); + return compact(throughputController, null); + } + + @Override + public List compact(CompactionThroughputController throughputController, + User user) throws IOException { + return compactor.compact(request, throughputController, user); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a69b6a6..02168e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -139,6 +139,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.MutationReplay; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.util.Bytes; @@ -1472,7 +1473,7 @@ public class HRegion implements HeapSize { // , Writable{ for (Store s : getStores().values()) { CompactionContext compaction = s.requestCompaction(); if (compaction != null) { - compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE); + compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null); } } } @@ -1493,6 +1494,11 @@ public class HRegion implements HeapSize { // , Writable{ */ public boolean compact(CompactionContext compaction, Store store, CompactionThroughputController throughputController) throws IOException { + return compact(compaction, store, throughputController, null); + } + + public boolean compact(CompactionContext compaction, Store store, + CompactionThroughputController throughputController, User user) throws IOException { assert compaction != null && compaction.hasSelection(); assert !compaction.getRequest().getFiles().isEmpty(); if (this.closing.get() || this.closed.get()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index da83e4b..5121bbf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -23,6 +23,7 @@ import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.security.Key; import java.security.KeyException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -1096,6 +1097,12 @@ public class HStore implements Store { @Override public List compact(CompactionContext compaction, CompactionThroughputController throughputController) throws IOException { + return compact(compaction, throughputController, null); + } + + @Override + public List compact(CompactionContext compaction, + CompactionThroughputController throughputController, User user) throws IOException { assert compaction != null; List sfs = null; CompactionRequest cr = compaction.getRequest(); @@ -1120,7 +1127,7 @@ public class HStore implements Store { + StringUtils.humanReadableInt(cr.getSize())); // Commence the compaction. - List newFiles = compaction.compact(throughputController); + List newFiles = compaction.compact(throughputController, user); // TODO: get rid of this! if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { @@ -1135,7 +1142,7 @@ public class HStore implements Store { return sfs; } // Do the steps necessary to complete the compaction. - sfs = moveCompatedFilesIntoPlace(cr, newFiles); + sfs = moveCompatedFilesIntoPlace(cr, newFiles, user); writeCompactionWalRecord(filesToCompact, sfs); replaceStoreFiles(filesToCompact, sfs); if (cr.isMajor()) { @@ -1156,13 +1163,30 @@ public class HStore implements Store { } private List moveCompatedFilesIntoPlace( - CompactionRequest cr, List newFiles) throws IOException { + final CompactionRequest cr, List newFiles, User user) throws IOException { List sfs = new ArrayList(newFiles.size()); for (Path newFile : newFiles) { assert newFile != null; - StoreFile sf = moveFileIntoPlace(newFile); + final StoreFile sf = moveFileIntoPlace(newFile); if (this.getCoprocessorHost() != null) { - this.getCoprocessorHost().postCompact(this, sf, cr); + final Store thisStore = this; + if (user == null) { + getCoprocessorHost().postCompact(thisStore, sf, cr); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + getCoprocessorHost().postCompact(thisStore, sf, cr); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } assert sf != null; sfs.add(sf); @@ -1389,6 +1413,11 @@ public class HStore implements Store { @Override public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest) throws IOException { + return requestCompaction(priority, baseRequest, null); + } + @Override + public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest, + User user) throws IOException { // don't even select for compaction if writes are disabled if (!this.areWritesEnabled()) { return null; @@ -1397,15 +1426,33 @@ public class HStore implements Store { // Before we do compaction, try to get rid of unneeded files to simplify things. removeUnneededFiles(); - CompactionContext compaction = storeEngine.createCompaction(); + final CompactionContext compaction = storeEngine.createCompaction(); this.lock.readLock().lock(); try { synchronized (filesCompacting) { + final Store thisStore = this; // First, see if coprocessor would want to override selection. if (this.getCoprocessorHost() != null) { - List candidatesForCoproc = compaction.preSelect(this.filesCompacting); - boolean override = this.getCoprocessorHost().preCompactSelection( - this, candidatesForCoproc, baseRequest); + final List candidatesForCoproc = compaction.preSelect(this.filesCompacting); + boolean override = false; + if (user == null) { + override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, + baseRequest); + } else { + try { + override = user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + return getCoprocessorHost().preCompactSelection(thisStore, candidatesForCoproc, + baseRequest); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } if (override) { // Coprocessor is overriding normal file selection. compaction.forceSelect(new CompactionRequest(candidatesForCoproc)); @@ -1433,8 +1480,25 @@ public class HStore implements Store { } } if (this.getCoprocessorHost() != null) { - this.getCoprocessorHost().postCompactSelection( + if (user == null) { + this.getCoprocessorHost().postCompactSelection( this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + getCoprocessorHost().postCompactSelection( + thisStore,ImmutableList.copyOf(compaction.getRequest().getFiles()),baseRequest); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } // Selected files; see if we have a compaction with some custom base request. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index ec3d54a..532208a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.security.User; /** * Interface for objects that hold a column family in a Region. Its a memstore and a set of zero or @@ -204,14 +205,28 @@ public interface Store extends HeapSize, StoreConfigInformation { CompactionContext requestCompaction() throws IOException; + /** + * @deprecated see requestCompaction(int, CompactionRequest, User) + */ + @Deprecated CompactionContext requestCompaction(int priority, CompactionRequest baseRequest) throws IOException; + CompactionContext requestCompaction(int priority, CompactionRequest baseRequest, User user) + throws IOException; + void cancelRequestedCompaction(CompactionContext compaction); + /** + * @deprecated see compact(CompactionContext, CompactionThroughputController, User) + */ + @Deprecated List compact(CompactionContext compaction, CompactionThroughputController throughputController) throws IOException; + List compact(CompactionContext compaction, + CompactionThroughputController throughputController, User user) throws IOException; + /** * @return true if we should run a major compaction. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java index b910527..0c01fd6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; +import org.apache.hadoop.hbase.security.User; import com.google.common.base.Preconditions; @@ -102,7 +103,15 @@ public class StripeStoreEngine extends StoreEngine compact(CompactionThroughputController throughputController) throws IOException { Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); - return this.stripeRequest.execute(compactor, throughputController); + return this.stripeRequest.execute(compactor, throughputController, null); } + + @Override + public List compact(CompactionThroughputController throughputController, User user) + throws IOException { + Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection"); + return this.stripeRequest.execute(compactor, throughputController, user); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java index 1c89bf0..5cbb03a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.security.User; /** @@ -71,6 +72,9 @@ public abstract class CompactionContext { public abstract List compact(CompactionThroughputController throughputController) throws IOException; + public abstract List compact(CompactionThroughputController throughputController, + User user) throws IOException; + public CompactionRequest getRequest() { assert hasSelection(); return this.request; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 42e4953..1109cb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; import java.io.InterruptedIOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; @@ -188,9 +190,31 @@ public abstract class Compactor { */ protected InternalScanner preCreateCoprocScanner(final CompactionRequest request, ScanType scanType, long earliestPutTs, List scanners) throws IOException { + return preCreateCoprocScanner(request, scanType, earliestPutTs, scanners, null); + } + + protected InternalScanner preCreateCoprocScanner(final CompactionRequest request, + final ScanType scanType, final long earliestPutTs, final List scanners, + User user) throws IOException { if (store.getCoprocessorHost() == null) return null; - return store.getCoprocessorHost() - .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request); + if (user == null) { + return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType, + earliestPutTs, request); + } else { + try { + return user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public InternalScanner run() throws Exception { + return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, + scanType, earliestPutTs, request); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } /** @@ -201,9 +225,24 @@ public abstract class Compactor { * @return Scanner scanner to use (usually the default); null if compaction should not proceed. */ protected InternalScanner postCreateCoprocScanner(final CompactionRequest request, - ScanType scanType, InternalScanner scanner) throws IOException { - if (store.getCoprocessorHost() == null) return scanner; - return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); + final ScanType scanType, final InternalScanner scanner, User user) throws IOException { + if (store.getCoprocessorHost() == null) return scanner; + if (user == null) { + return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); + } else { + try { + return user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public InternalScanner run() throws Exception { + return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index dfbd3f4..68ba419 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -34,10 +34,11 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; +import org.apache.hadoop.hbase.security.User; /** * Compact passed set of files. Create an instance and then call - * {@link #compact(CompactionRequest, CompactionThroughputController)} + * {@link #compact(CompactionRequest, CompactionThroughputController, User)} */ @InterfaceAudience.Private public class DefaultCompactor extends Compactor { @@ -51,7 +52,7 @@ public class DefaultCompactor extends Compactor { * Do a minor/major compaction on an explicit set of storefiles from a Store. */ public List compact(final CompactionRequest request, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, User user) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isMajor()); this.progress = new CompactionProgress(fd.maxKeyCount); @@ -84,11 +85,11 @@ public class DefaultCompactor extends Compactor { /* Include deletes, unless we are doing a major compaction */ ScanType scanType = request.isMajor() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES; - scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); + scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user); if (scanner == null) { scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs); } - scanner = postCreateCoprocScanner(request, scanType, scanner); + scanner = postCreateCoprocScanner(request, scanType, scanner, user); if (scanner == null) { // NULL scanner returned from coprocessor hooks means skip normal processing. return newFiles; @@ -144,7 +145,7 @@ public class DefaultCompactor extends Compactor { /** * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to - * {@link #compact(CompactionRequest, CompactionThroughputController)}; + * {@link #compact(CompactionRequest, CompactionThroughputController, User)}; * @param filesToCompact the files to compact. These are used as the compactionSelection for * the generated {@link CompactionRequest}. * @param isMajor true to major compact (prune all deletes, max versions, etc) @@ -156,6 +157,6 @@ public class DefaultCompactor extends Compactor { throws IOException { CompactionRequest cr = new CompactionRequest(filesToCompact); cr.setIsMajor(isMajor); - return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE); + return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, null); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index eed5d96..ebd20dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreUtils; import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConcatenatedLists; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -390,6 +391,10 @@ public class StripeCompactionPolicy extends CompactionPolicy { protected CompactionRequest request; protected byte[] majorRangeFromRow = null, majorRangeToRow = null; + public List execute(StripeCompactor compactor, + CompactionThroughputController throughputController) throws IOException { + return execute(compactor, throughputController, null); + } /** * Executes the request against compactor (essentially, just calls correct overload of * compact method), to simulate more dynamic dispatch. @@ -397,7 +402,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { * @return result of compact(...) */ public abstract List execute(StripeCompactor compactor, - CompactionThroughputController throughputController) throws IOException; + CompactionThroughputController throughputController, User user) throws IOException; public StripeCompactionRequest(CompactionRequest request) { this.request = request; @@ -449,9 +454,9 @@ public class StripeCompactionPolicy extends CompactionPolicy { @Override public List execute(StripeCompactor compactor, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, User user) throws IOException { return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow, - this.majorRangeToRow, throughputController); + this.majorRangeToRow, throughputController, user); } } @@ -500,9 +505,9 @@ public class StripeCompactionPolicy extends CompactionPolicy { @Override public List execute(StripeCompactor compactor, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, User user) throws IOException { return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow, - this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController); + this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user); } /** Set major range of the compaction to the entire compaction range. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 342ecce..630ff2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; /** @@ -51,8 +52,14 @@ public class StripeCompactor extends Compactor { } public List compact(CompactionRequest request, List targetBoundaries, + byte[] majorRangeFromRow, byte[] majorRangeToRow, + CompactionThroughputController throughputController) throws IOException { + return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow, + throughputController, null); + } + public List compact(CompactionRequest request, List targetBoundaries, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, User user) throws IOException { if (LOG.isDebugEnabled()) { StringBuilder sb = new StringBuilder(); sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:"); @@ -64,12 +71,19 @@ public class StripeCompactor extends Compactor { StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter( targetBoundaries, majorRangeFromRow, majorRangeToRow); return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow, - throughputController); + throughputController, user); + } + + public List compact(CompactionRequest request, int targetCount, long targetSize, + byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, + CompactionThroughputController throughputController) throws IOException { + return compact(request, targetCount, targetSize, left, right, majorRangeFromRow, + majorRangeToRow, throughputController, null); } public List compact(CompactionRequest request, int targetCount, long targetSize, byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, User user) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Executing compaction with " + targetSize + " target file size, no more than " + targetCount + " files, in [" @@ -78,12 +92,12 @@ public class StripeCompactor extends Compactor { StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter( targetCount, targetSize, left, right); return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow, - throughputController); + throughputController, user); } private List compactInternal(StripeMultiFileWriter mw, final CompactionRequest request, byte[] majorRangeFromRow, byte[] majorRangeToRow, - CompactionThroughputController throughputController) throws IOException { + CompactionThroughputController throughputController, User user) throws IOException { final Collection filesToCompact = request.getFiles(); final FileDetails fd = getFileDetails(filesToCompact, request.isMajor()); this.progress = new CompactionProgress(fd.maxKeyCount); @@ -97,7 +111,7 @@ public class StripeCompactor extends Compactor { try { // Get scanner to use. ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES; - scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners); + scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners, user); if (scanner == null) { scanner = (majorRangeFromRow == null) ? createScanner(store, scanners, @@ -105,7 +119,7 @@ public class StripeCompactor extends Compactor { : createScanner(store, scanners, smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow); } - scanner = postCreateCoprocScanner(request, coprocScanType, scanner); + scanner = postCreateCoprocScanner(request, coprocScanType, scanner, user); if (scanner == null) { // NULL scanner returned from coprocessor hooks means skip normal processing. return new ArrayList(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 19bde79..f238096 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; @@ -233,7 +234,15 @@ public class TestRegionObserverScannerOpenHook { boolean ret = super.compact(compaction, store, throughputController); if (ret) compactionStateChangeLatch.countDown(); return ret; - } + } + + @Override + public boolean compact(CompactionContext compaction, Store store, + CompactionThroughputController throughputController, User user) throws IOException { + boolean ret = super.compact(compaction, store, throughputController, user); + if (ret) compactionStateChangeLatch.countDown(); + return ret; + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java index dd9c037..0526462 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.*; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.security.User; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -56,6 +57,8 @@ public class StatefulStoreMockMaker { Store store = mock(Store.class, name); when(store.requestCompaction( anyInt(), isNull(CompactionRequest.class))).then(new SelectAnswer()); + when(store.requestCompaction( + anyInt(), isNull(CompactionRequest.class), any(User.class))).then(new SelectAnswer()); when(store.getCompactPriority()).then(new PriorityAnswer()); doAnswer(new CancelAnswer()).when( store).cancelRequestedCompaction(any(CompactionContext.class)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index d19e428..9aa0142 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputCont import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -368,6 +369,13 @@ public class TestCompaction { finishCompaction(this.selectedFiles); return new ArrayList(); } + + @Override + public List compact(CompactionThroughputController throughputController, + User user) throws IOException { + finishCompaction(this.selectedFiles); + return new ArrayList(); + } } @Override @@ -417,6 +425,12 @@ public class TestCompaction { @Override public List compact(CompactionThroughputController throughputController) throws IOException { + return compact(throughputController, null); + } + + @Override + public List compact(CompactionThroughputController throughputController, User user) + throws IOException { try { isInCompact = true; synchronized (this) { @@ -496,7 +510,7 @@ public class TestCompaction { HRegion r = mock(HRegion.class); when( r.compact(any(CompactionContext.class), any(Store.class), - any(CompactionThroughputController.class))).then(new Answer() { + any(CompactionThroughputController.class), any(User.class))).then(new Answer() { public Boolean answer(InvocationOnMock invocation) throws Throwable { ((CompactionContext)invocation.getArguments()[0]).compact( (CompactionThroughputController)invocation.getArguments()[2]); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java index 4e002de..e17cb70 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputCont import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -75,7 +76,8 @@ public class TestStripeStoreEngine { when( mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class), - any(CompactionThroughputController.class))).thenReturn(new ArrayList()); + any(CompactionThroughputController.class), any(User.class))) + .thenReturn(new ArrayList()); // Produce 3 L0 files. StoreFile sf = createFile(); @@ -96,7 +98,7 @@ public class TestStripeStoreEngine { compaction.compact(NoLimitCompactionThroughputController.INSTANCE); verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L, StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null, - NoLimitCompactionThroughputController.INSTANCE); + NoLimitCompactionThroughputController.INSTANCE, null); } private static StoreFile createFile() throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 33c28a1..9d2e674 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager; import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConcatenatedLists; @@ -210,10 +211,10 @@ public class TestStripeCompactionPolicy { assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); assertEquals(si.getStorefiles(), scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), - any(NoLimitCompactionThroughputController.class)); + any(NoLimitCompactionThroughputController.class), any(User.class)); } @Test @@ -455,7 +456,7 @@ public class TestStripeCompactionPolicy { // All the Stripes are expired, so the Compactor will not create any Writers. We need to create // an empty file to preserve metadata StripeCompactor sc = createCompactor(); - List paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + List paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); assertEquals(1, paths.size()); } @@ -514,7 +515,7 @@ public class TestStripeCompactionPolicy { assertTrue(policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher>() { @Override public boolean matches(Object argument) { @@ -528,7 +529,7 @@ public class TestStripeCompactionPolicy { } }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom), dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo), - any(NoLimitCompactionThroughputController.class)); + any(NoLimitCompactionThroughputController.class), any(User.class)); } /** @@ -549,12 +550,12 @@ public class TestStripeCompactionPolicy { assertTrue(!needsCompaction || policy.needsCompactions(si, al())); StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(), false); verifyCollectionsEqual(sfs, scr.getRequest().getFiles()); - scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE); + scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE, null); verify(sc, times(1)).compact(eq(scr.getRequest()), count == null ? anyInt() : eq(count.intValue()), size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end), dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end), - any(NoLimitCompactionThroughputController.class)); + any(NoLimitCompactionThroughputController.class), any(User.class)); } /** Verify arbitrary flush. */