diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 2a155dd..2766b5b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -201,9 +201,9 @@ public class CompactSplitThread implements CompactionRequestor { } public synchronized void requestRegionsMerge(final HRegion a, - final HRegion b, final boolean forcible, long masterSystemTime) { + final HRegion b, final boolean forcible, long masterSystemTime, User user) { try { - mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime)); + mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user)); if (LOG.isDebugEnabled()) { LOG.debug("Region merge requested for " + a + "," + b + ", forcible=" + forcible + ". " + this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 63abf0e..bdda1d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -4207,7 +4207,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa long endTime = EnvironmentEdgeManager.currentTimeMillis(); metricsRegionServer.updateFlushTime(endTime - startTime); } - compactSplitThread.requestRegionsMerge(regionA, regionB, forcible, masterSystemTime); + compactSplitThread.requestRegionsMerge(regionA, regionB, forcible, masterSystemTime, + RpcServer.getRequestUser()); return MergeRegionsResponse.newBuilder().build(); } catch (DroppedSnapshotException ex) { abort("Replay of WAL required. Forcing server shutdown", ex); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java index 4b6e862..6005aa1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; @@ -43,15 +44,17 @@ class RegionMergeRequest implements Runnable { private final boolean forcible; private TableLock tableLock; private final long masterSystemTime; + private final User user; RegionMergeRequest(HRegion a, HRegion b, HRegionServer hrs, boolean forcible, - long masterSystemTime) { + long masterSystemTime, User user) { Preconditions.checkNotNull(hrs); this.region_a = a; this.region_b = b; this.server = hrs; this.forcible = forcible; this.masterSystemTime = masterSystemTime; + this.user = user; } @Override @@ -87,7 +90,7 @@ class RegionMergeRequest implements Runnable { // the prepare call -- we are not ready to merge just now. Just return. if (!mt.prepare(this.server)) return; try { - mt.execute(this.server, this.server); + mt.execute(this.server, this.server, this.user); } catch (Exception e) { if (this.server.isStopping() || this.server.isStopped()) { LOG.info( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java index 6af8f36..1879dc0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java @@ -23,6 +23,8 @@ import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING; import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE; import java.io.IOException; +import java.io.InterruptedIOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; @@ -47,6 +49,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -255,23 +258,47 @@ public class RegionMergeTransaction { */ public HRegion execute(final Server server, final RegionServerServices services) throws IOException { + if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) { + LOG.warn("Should use execute(Server, RegionServerServices, User)"); + } + return execute(server, services, null); + } + + public HRegion execute(final Server server, final RegionServerServices services, User user) + throws IOException { useZKForAssignment = server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration()); if (rsCoprocessorHost == null) { rsCoprocessorHost = server != null ? ((HRegionServer) server).getCoprocessorHost() : null; } - HRegion mergedRegion = createMergedRegion(server, services); + final HRegion mergedRegion = createMergedRegion(server, services, user); if (rsCoprocessorHost != null) { - rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion); + if (user == null) { + rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + rsCoprocessorHost.postMergeCommit(region_a, region_b, mergedRegion); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } - return stepsAfterPONR(server, services, mergedRegion); + stepsAfterPONR(server, services, mergedRegion, user); + return mergedRegion; } - public HRegion stepsAfterPONR(final Server server, final RegionServerServices services, - HRegion mergedRegion) throws IOException { + public void stepsAfterPONR(final Server server, final RegionServerServices services, + final HRegion mergedRegion, User user) throws IOException { openMergedRegion(server, services, mergedRegion); - transitionZKNode(server, services, mergedRegion); - return mergedRegion; + transitionZKNode(server, services, mergedRegion, user); } /** @@ -284,7 +311,7 @@ public class RegionMergeTransaction { * {@link #rollback(Server, RegionServerServices)} */ HRegion createMergedRegion(final Server server, - final RegionServerServices services) throws IOException { + final RegionServerServices services, User user) throws IOException { LOG.info("Starting merge of " + region_a + " and " + region_b.getRegionNameAsString() + ", forcible=" + forcible); if ((server != null && server.isStopped()) @@ -293,7 +320,24 @@ public class RegionMergeTransaction { } if (rsCoprocessorHost != null) { - if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) { + boolean ret = false; + if (user == null) { + ret = rsCoprocessorHost.preMerge(region_a, region_b); + } else { + try { + ret = user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + return rsCoprocessorHost.preMerge(region_a, region_b); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + if (ret) { throw new IOException("Coprocessor bypassing regions " + this.region_a + " " + this.region_b + " merge."); } @@ -306,9 +350,27 @@ public class RegionMergeTransaction { HRegion mergedRegion = stepsBeforePONR(server, services, testing); @MetaMutationAnnotation - List metaEntries = new ArrayList(); + final List metaEntries = new ArrayList(); if (rsCoprocessorHost != null) { - if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) { + boolean ret = false; + if (user == null) { + ret = rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries); + } else { + try { + ret = user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + return rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + + if (ret) { throw new IOException("Coprocessor bypassing regions " + this.region_a + " " + this.region_b + " merge."); } @@ -609,7 +671,7 @@ public class RegionMergeTransaction { * {@link #rollback(Server, RegionServerServices)} */ void transitionZKNode(final Server server, final RegionServerServices services, - HRegion mergedRegion) throws IOException { + final HRegion mergedRegion, User user) throws IOException { if (useZKAndZKIsSet(server)) { // Tell master about merge by updating zk. If we fail, abort. try { @@ -648,7 +710,23 @@ public class RegionMergeTransaction { } if (rsCoprocessorHost != null) { - rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion); + if (user == null) { + rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } // Leaving here, the mergedir with its dross will be in place but since the @@ -771,10 +849,34 @@ public class RegionMergeTransaction { @SuppressWarnings("deprecation") public boolean rollback(final Server server, final RegionServerServices services) throws IOException { + if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) { + LOG.warn("Should use execute(Server, RegionServerServices, User)"); + } + return rollback(server, services, null); + } + + public boolean rollback(final Server server, + final RegionServerServices services, User user) throws IOException { assert this.mergedRegionInfo != null; // Coprocessor callback if (rsCoprocessorHost != null) { - rsCoprocessorHost.preRollBackMerge(this.region_a, this.region_b); + if (user == null) { + rsCoprocessorHost.preRollBackMerge(region_a, region_b); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + rsCoprocessorHost.preRollBackMerge(region_a, region_b); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } boolean result = true; @@ -852,7 +954,23 @@ public class RegionMergeTransaction { } // Coprocessor callback if (rsCoprocessorHost != null) { - rsCoprocessorHost.postRollBackMerge(this.region_a, this.region_b); + if (user == null) { + rsCoprocessorHost.postRollBackMerge(region_a, region_b); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + rsCoprocessorHost.postRollBackMerge(region_a, region_b); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } return result; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java index 7330167..e32dd4a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java @@ -166,7 +166,7 @@ public class TestRegionServerObserver { preMergeAfterPONRCalled = true; RegionServerCoprocessorEnvironment environment = ctx.getEnvironment(); HRegionServer rs = (HRegionServer) environment.getRegionServerServices(); - rmt.stepsAfterPONR(rs, rs, this.mergedRegion); + rmt.stepsAfterPONR(rs, rs, this.mergedRegion, null); } @Override