diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 9474ac0..04adf25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.StealJobQueue; @@ -247,6 +249,13 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } public synchronized void requestSplit(final Region r, byte[] midKey) { + requestSplit(r, midKey, null); + } + + /* + * The User parameter allows the split thread to assume the correct user identity + */ + public synchronized void requestSplit(final Region r, byte[] midKey, User user) { if (midKey == null) { LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() + " not splittable because midkey=null"); @@ -256,7 +265,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi return; } try { - this.splits.execute(new SplitRequest(r, midKey, this.server)); + this.splits.execute(new SplitRequest(r, midKey, this.server, user)); if (LOG.isDebugEnabled()) { LOG.debug("Split requested for " + r + ". " + this); } @@ -274,54 +283,55 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi @Override public synchronized List requestCompaction(final Region r, final String why, List> requests) throws IOException { - return requestCompaction(r, why, Store.NO_PRIORITY, requests); + return requestCompaction(r, why, Store.NO_PRIORITY, requests, null); } @Override public synchronized CompactionRequest requestCompaction(final Region r, final Store s, final String why, CompactionRequest request) throws IOException { - return requestCompaction(r, s, why, Store.NO_PRIORITY, request); + return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null); } @Override public synchronized List requestCompaction(final Region r, final String why, - int p, List> requests) throws IOException { - return requestCompactionInternal(r, why, p, requests, true); + int p, List> requests, User user) throws IOException { + return requestCompactionInternal(r, why, p, requests, true, user); } private List requestCompactionInternal(final Region r, final String why, - int p, List> requests, boolean selectNow) throws IOException { + int p, List> requests, boolean selectNow, User user) + throws IOException { // not a special compaction request, so make our own list List ret = null; if (requests == null) { ret = selectNow ? new ArrayList(r.getStores().size()) : null; for (Store s : r.getStores()) { - CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow); + CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user); if (selectNow) ret.add(cr); } } else { Preconditions.checkArgument(selectNow); // only system requests have selectNow == false ret = new ArrayList(requests.size()); for (Pair pair : requests) { - ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst())); + ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user)); } } return ret; } public CompactionRequest requestCompaction(final Region r, final Store s, - final String why, int priority, CompactionRequest request) throws IOException { - return requestCompactionInternal(r, s, why, priority, request, true); + final String why, int priority, CompactionRequest request, User user) throws IOException { + return requestCompactionInternal(r, s, why, priority, request, true, user); } public synchronized void requestSystemCompaction( final Region r, final String why) throws IOException { - requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false); + requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null); } public void requestSystemCompaction( final Region r, final Store s, final String why) throws IOException { - requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false); + requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null); } /** @@ -333,7 +343,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi * compaction will be used. */ private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s, - final String why, int priority, CompactionRequest request, boolean selectNow) + final String why, int priority, CompactionRequest request, boolean selectNow, User user) throws IOException { if (this.server.isStopped() || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) { @@ -350,7 +360,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi // pool; we will do selection there, and move to large pool if necessary. ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize())) ? longCompactions : shortCompactions; - pool.execute(new CompactionRunner(s, r, compaction, pool)); + pool.execute(new CompactionRunner(s, r, compaction, pool, user)); if (LOG.isDebugEnabled()) { String type = (pool == shortCompactions) ? "Small " : "Large "; LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system") @@ -454,9 +464,10 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi private CompactionContext compaction; private int queuedPriority; private ThreadPoolExecutor parent; + private User user; public CompactionRunner(Store store, Region region, - CompactionContext compaction, ThreadPoolExecutor parent) { + CompactionContext compaction, ThreadPoolExecutor parent, User user) { super(); this.store = store; this.region = (HRegion)region; @@ -464,6 +475,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi this.queuedPriority = (this.compaction == null) ? store.getCompactPriority() : compaction.getRequest().getPriority(); this.parent = parent; + this.user = user; } @Override @@ -472,13 +484,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi : ("Store = " + store.toString() + ", pri = " + queuedPriority); } - @Override - public void run() { - Preconditions.checkNotNull(server); - if (server.isStopped() - || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { - return; - } + private void doCompaction() { // Common case - system compaction without a file selection. Select now. if (this.compaction == null) { int oldPriority = this.queuedPriority; @@ -552,6 +558,31 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi this.compaction.getRequest().afterExecute(); } + @Override + public void run() { + Preconditions.checkNotNull(server); + if (server.isStopped() + || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) { + return; + } + if (this.user == null) doCompaction(); + else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + doCompaction(); + return null; + } + }); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } catch (IOException ioe) { + LOG.error("Encountered exception while compacting", ioe); + } + } + } + private String formatStackTrace(Exception ex) { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java index 930baf0..a30b526 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Pair; @InterfaceAudience.Private @@ -72,12 +73,14 @@ public interface CompactionRequestor { * @param requests custom compaction requests. Each compaction must specify the store on which it * is acting. Can be null in which case a compaction will be attempted on all * stores for the region. + * @user the effective user * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no * compactions were started. * @throws IOException */ List requestCompaction( - final Region r, final String why, int pri, List> requests + final Region r, final String why, int pri, List> requests, + User user ) throws IOException; /** @@ -87,10 +90,11 @@ public interface CompactionRequestor { * @param pri Priority of this compaction. minHeap. <=0 is critical * @param request custom compaction request to run. {@link Store} and {@link Region} for the * request must match the region and store specified here. + * @param user * @return The created {@link CompactionRequest} or null if no compaction was started * @throws IOException */ CompactionRequest requestCompaction( - final Region r, final Store s, final String why, int pri, CompactionRequest request + final Region r, final Store s, final String why, int pri, CompactionRequest request, User user ) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 59d13fa..0ba7b94 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1546,7 +1546,7 @@ public class HRegionServer extends HasThread implements } else { this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use configured priority", - this.majorCompactPriority, null); + this.majorCompactPriority, null, null); } } } catch (IOException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 3c0f50a..d00c65e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1238,10 +1238,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg; if(family != null) { regionServer.compactSplitThread.requestCompaction(region, store, log, - Store.PRIORITY_USER, null); + Store.PRIORITY_USER, null, RpcServer.getRequestUser()); } else { regionServer.compactSplitThread.requestCompaction(region, log, - Store.PRIORITY_USER, null); + Store.PRIORITY_USER, null, RpcServer.getRequestUser()); } return CompactRegionResponse.newBuilder().build(); } catch (IOException ie) { @@ -1850,7 +1850,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, splitPoint = request.getSplitPoint().toByteArray(); } ((HRegion)region).forceSplit(splitPoint); - regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit()); + regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(), + RpcServer.getRequestUser()); return SplitRegionResponse.newBuilder().build(); } catch (DroppedSnapshotException ex) { regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java index c40f9b0..7e71727 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java @@ -19,12 +19,14 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; @@ -41,13 +43,15 @@ class SplitRequest implements Runnable { private final HRegion parent; private final byte[] midKey; private final HRegionServer server; + private final User user; private TableLock tableLock; - SplitRequest(Region region, byte[] midKey, HRegionServer hrs) { + SplitRequest(Region region, byte[] midKey, HRegionServer hrs, User user) { Preconditions.checkNotNull(hrs); this.parent = (HRegion)region; this.midKey = midKey; this.server = hrs; + this.user = user; } @Override @@ -55,13 +59,7 @@ class SplitRequest implements Runnable { return "regionName=" + parent + ", midKey=" + Bytes.toStringBinary(midKey); } - @Override - public void run() { - if (this.server.isStopping() || this.server.isStopped()) { - LOG.debug("Skipping split because server is stopping=" + - this.server.isStopping() + " or stopped=" + this.server.isStopped()); - return; - } + private void doSplitting() { boolean success = false; server.metricsRegionServer.incrSplitRequest(); long startTime = EnvironmentEdgeManager.currentTime(); @@ -148,6 +146,31 @@ class SplitRequest implements Runnable { } } + @Override + public void run() { + if (this.server.isStopping() || this.server.isStopped()) { + LOG.debug("Skipping split because server is stopping=" + + this.server.isStopping() + " or stopped=" + this.server.isStopped()); + return; + } + if (this.user == null) doSplitting(); + else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + doSplitting(); + return null; + } + }); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } catch (IOException ioe) { + LOG.error("Encountered exception while splitting", ioe); + } + } + } + protected void releaseTableLock() { if (this.tableLock != null) { try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 122b7a5..a377325 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -291,7 +291,7 @@ public class TestCompaction { CountDownLatch latch = new CountDownLatch(1); TrackableCompactionRequest request = new TrackableCompactionRequest(latch); - thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request); + thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request,null); // wait for the latch to complete. latch.await(); @@ -327,7 +327,7 @@ public class TestCompaction { } thread.requestCompaction(r, "test mulitple custom comapctions", Store.PRIORITY_USER, - Collections.unmodifiableList(requests)); + Collections.unmodifiableList(requests), null); // wait for the latch to complete. latch.await();