diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 83d59a6..24ff5c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -212,9 +212,9 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } public synchronized void requestRegionsMerge(final HRegion a, - final HRegion b, final boolean forcible, long masterSystemTime) { + final HRegion b, final boolean forcible, long masterSystemTime, User user) { try { - mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime)); + mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user)); if (LOG.isDebugEnabled()) { LOG.debug("Region merge requested for " + a + "," + b + ", forcible=" + forcible + ". " + this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 540f98a..5a18db1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1249,7 +1249,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); } regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible, - masterSystemTime); + masterSystemTime, RpcServer.getRequestUser()); return MergeRegionsResponse.newBuilder().build(); } catch (DroppedSnapshotException ex) { regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java index d6e3818..da1dbc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; @@ -43,15 +44,17 @@ class RegionMergeRequest implements Runnable { private final boolean forcible; private TableLock tableLock; private final long masterSystemTime; + private final User user; RegionMergeRequest(HRegion a, HRegion b, HRegionServer hrs, boolean forcible, - long masterSystemTime) { + long masterSystemTime, User user) { Preconditions.checkNotNull(hrs); this.region_a = a; this.region_b = b; this.server = hrs; this.forcible = forcible; this.masterSystemTime = masterSystemTime; + this.user = user; } @Override @@ -87,7 +90,7 @@ class RegionMergeRequest implements Runnable { // the prepare call -- we are not ready to merge just now. Just return. if (!mt.prepare(this.server)) return; try { - mt.execute(this.server, this.server); + mt.execute(this.server, this.server, this.user); } catch (Exception e) { if (this.server.isStopping() || this.server.isStopped()) { LOG.info( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java index b184b3d..8766037 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.io.InterruptedIOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; @@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.RegionMergeCoordination.RegionMergeDetails; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -244,7 +247,15 @@ public class RegionMergeTransaction { * @see #rollback(Server, RegionServerServices) */ public HRegion execute(final Server server, - final RegionServerServices services) throws IOException { + final RegionServerServices services) throws IOException { + if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) { + LOG.warn("Should use execute(Server, RegionServerServices, User)"); + } + return execute(server, services, null); + } + + public HRegion execute(final Server server, final RegionServerServices services, User user) + throws IOException { useCoordinationForAssignment = server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration()); if (rmd == null) { @@ -257,15 +268,32 @@ public class RegionMergeTransaction { rsCoprocessorHost = server != null ? ((HRegionServer) server).getRegionServerCoprocessorHost() : null; } - HRegion mergedRegion = createMergedRegion(server, services); + final HRegion mergedRegion = createMergedRegion(server, services, user); if (rsCoprocessorHost != null) { - rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion); + if (user == null) { + rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + rsCoprocessorHost.postMergeCommit(region_a, region_b, mergedRegion); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } - return stepsAfterPONR(server, services, mergedRegion); + stepsAfterPONR(server, services, mergedRegion, user); + return mergedRegion; } - public HRegion stepsAfterPONR(final Server server, final RegionServerServices services, - HRegion mergedRegion) throws IOException { + public void stepsAfterPONR(final Server server, final RegionServerServices services, + final HRegion mergedRegion, User user) throws IOException { openMergedRegion(server, services, mergedRegion); if (useCoordination(server)) { ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) @@ -273,9 +301,24 @@ public class RegionMergeTransaction { region_a, region_b, rmd, mergedRegion); } if (rsCoprocessorHost != null) { - rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion); + if (user == null) { + rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } - return mergedRegion; } /** @@ -287,7 +330,7 @@ public class RegionMergeTransaction { * {@link #rollback(Server, RegionServerServices)} */ HRegion createMergedRegion(final Server server, - final RegionServerServices services) throws IOException { + final RegionServerServices services, User user) throws IOException { LOG.info("Starting merge of " + region_a + " and " + region_b.getRegionNameAsString() + ", forcible=" + forcible); if ((server != null && server.isStopped()) @@ -296,7 +339,24 @@ public class RegionMergeTransaction { } if (rsCoprocessorHost != null) { - if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) { + boolean ret = false; + if (user == null) { + ret = rsCoprocessorHost.preMerge(region_a, region_b); + } else { + try { + ret = user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + return rsCoprocessorHost.preMerge(region_a, region_b); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + if (ret) { throw new IOException("Coprocessor bypassing regions " + this.region_a + " " + this.region_b + " merge."); } @@ -309,9 +369,27 @@ public class RegionMergeTransaction { HRegion mergedRegion = stepsBeforePONR(server, services, testing); @MetaMutationAnnotation - List metaEntries = new ArrayList(); + final List metaEntries = new ArrayList(); if (rsCoprocessorHost != null) { - if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) { + boolean ret = false; + if (user == null) { + ret = rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries); + } else { + try { + ret = user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + return rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + + if (ret) { throw new IOException("Coprocessor bypassing regions " + this.region_a + " " + this.region_b + " merge."); } @@ -654,10 +732,34 @@ public class RegionMergeTransaction { @SuppressWarnings("deprecation") public boolean rollback(final Server server, final RegionServerServices services) throws IOException { + if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) { + LOG.warn("Should use execute(Server, RegionServerServices, User)"); + } + return rollback(server, services, null); + } + + public boolean rollback(final Server server, + final RegionServerServices services, User user) throws IOException { assert this.mergedRegionInfo != null; // Coprocessor callback if (rsCoprocessorHost != null) { - rsCoprocessorHost.preRollBackMerge(this.region_a, this.region_b); + if (user == null) { + rsCoprocessorHost.preRollBackMerge(region_a, region_b); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + rsCoprocessorHost.preRollBackMerge(region_a, region_b); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } boolean result = true; @@ -736,7 +838,23 @@ public class RegionMergeTransaction { } // Coprocessor callback if (rsCoprocessorHost != null) { - rsCoprocessorHost.postRollBackMerge(this.region_a, this.region_b); + if (user == null) { + rsCoprocessorHost.postRollBackMerge(region_a, region_b); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + rsCoprocessorHost.postRollBackMerge(region_a, region_b); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } return result; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java index 638321c..244f3fe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java @@ -162,7 +162,7 @@ public class TestRegionServerObserver { preMergeAfterPONRCalled = true; RegionServerCoprocessorEnvironment environment = ctx.getEnvironment(); HRegionServer rs = (HRegionServer) environment.getRegionServerServices(); - rmt.stepsAfterPONR(rs, rs, this.mergedRegion); + rmt.stepsAfterPONR(rs, rs, this.mergedRegion, null); } @Override