diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index a5f47d7..bf44eea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -223,9 +223,9 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi } public synchronized void requestRegionsMerge(final Region a, - final Region b, final boolean forcible, long masterSystemTime) { + final Region b, final boolean forcible, long masterSystemTime, User user) { try { - mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime)); + mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user)); if (LOG.isDebugEnabled()) { LOG.debug("Region merge requested for " + a + "," + b + ", forcible=" + forcible + ". " + this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 2a5eaad..8822696 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1349,7 +1349,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, regionServer.metricsRegionServer.updateFlushTime(endTime - startTime); } regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible, - masterSystemTime); + masterSystemTime, RpcServer.getRequestUser()); return MergeRegionsResponse.newBuilder().build(); } catch (DroppedSnapshotException ex) { regionServer.abort("Replay of WAL required. Forcing server shutdown", ex); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java index e50b32c..4d806aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; @@ -43,15 +44,17 @@ class RegionMergeRequest implements Runnable { private final boolean forcible; private TableLock tableLock; private final long masterSystemTime; + private final User user; RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible, - long masterSystemTime) { + long masterSystemTime, User user) { Preconditions.checkNotNull(hrs); this.region_a = (HRegion)a; this.region_b = (HRegion)b; this.server = hrs; this.forcible = forcible; this.masterSystemTime = masterSystemTime; + this.user = user; } @Override @@ -88,7 +91,7 @@ class RegionMergeRequest implements Runnable { // the prepare call -- we are not ready to merge just now. Just return. if (!mt.prepare(this.server)) return; try { - mt.execute(this.server, this.server); + mt.execute(this.server, this.server, this.user); } catch (Exception e) { if (this.server.isStopping() || this.server.isStopped()) { LOG.info( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java index b9533ad..46aaedf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.security.User; /** * Executes region merge as a "transaction". It is similar with @@ -170,10 +171,25 @@ public interface RegionMergeTransaction { * @return merged region * @throws IOException * @see #rollback(Server, RegionServerServices) + * @deprecated use #execute(Server, RegionServerServices, User) */ + @Deprecated Region execute(Server server, RegionServerServices services) throws IOException; /** + * Run the transaction. + * @param server Hosting server instance. Can be null when testing + * @param services Used to online/offline regions. + * @param user + * @throws IOException If thrown, transaction failed. Call + * {@link #rollback(Server, RegionServerServices)} + * @return merged region + * @throws IOException + * @see #rollback(Server, RegionServerServices, User) + */ + Region execute(Server server, RegionServerServices services, User user) throws IOException; + + /** * Roll back a failed transaction * @param server Hosting server instance (May be null when testing). * @param services Services of regionserver, used to online regions. @@ -181,10 +197,24 @@ public interface RegionMergeTransaction { * @return True if we successfully rolled back, false if we got to the point * of no return and so now need to abort the server to minimize * damage. + * @deprecated use #rollback(Server, RegionServerServices, User) */ + @Deprecated boolean rollback(Server server, RegionServerServices services) throws IOException; /** + * Roll back a failed transaction + * @param server Hosting server instance (May be null when testing). + * @param services Services of regionserver, used to online regions. + * @param user + * @throws IOException If thrown, rollback failed. Take drastic action. + * @return True if we successfully rolled back, false if we got to the point + * of no return and so now need to abort the server to minimize + * damage. + */ + boolean rollback(Server server, RegionServerServices services, User user) throws IOException; + + /** * Register a listener for transaction preparation, execution, and possibly * rollback phases. *

A listener can abort a transaction by throwing an exception. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java index 514d2d5..b42fa55 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.io.InterruptedIOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; @@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.RegionMergeCoordination.RegionMergeDetails; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl.LoggingProgressable; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConfigUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -249,6 +252,15 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { @Override public HRegion execute(final Server server, final RegionServerServices services) throws IOException { + if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) { + LOG.warn("Should use execute(Server, RegionServerServices, User)"); + } + return execute(server, services, null); + } + + @Override + public HRegion execute(final Server server, final RegionServerServices services, User user) + throws IOException { this.server = server; this.rsServices = services; useCoordinationForAssignment = @@ -263,19 +275,35 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { rsCoprocessorHost = server != null ? ((HRegionServer) server).getRegionServerCoprocessorHost() : null; } - HRegion mergedRegion = createMergedRegion(server, services); + final HRegion mergedRegion = createMergedRegion(server, services, user); if (rsCoprocessorHost != null) { - rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion); + if (user == null) { + rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + rsCoprocessorHost.postMergeCommit(region_a, region_b, mergedRegion); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } - mergedRegion = stepsAfterPONR(server, services, mergedRegion); + stepsAfterPONR(server, services, mergedRegion, user); transition(RegionMergeTransactionPhase.COMPLETED); return mergedRegion; } - public HRegion stepsAfterPONR(final Server server, final RegionServerServices services, - HRegion mergedRegion) throws IOException { + public void stepsAfterPONR(final Server server, final RegionServerServices services, + final HRegion mergedRegion, User user) throws IOException { openMergedRegion(server, services, mergedRegion); if (useCoordination(server)) { ((BaseCoordinatedStateManager) server.getCoordinatedStateManager()) @@ -283,9 +311,24 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { mergedRegionInfo, region_a, region_b, rmd, mergedRegion); } if (rsCoprocessorHost != null) { - rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion); + if (user == null) { + rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } - return mergedRegion; } /** @@ -297,7 +340,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { * {@link #rollback(Server, RegionServerServices)} */ HRegion createMergedRegion(final Server server, - final RegionServerServices services) throws IOException { + final RegionServerServices services, User user) throws IOException { LOG.info("Starting merge of " + region_a + " and " + region_b.getRegionInfo().getRegionNameAsString() + ", forcible=" + forcible); if ((server != null && server.isStopped()) @@ -306,7 +349,24 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { } if (rsCoprocessorHost != null) { - if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) { + boolean ret = false; + if (user == null) { + ret = rsCoprocessorHost.preMerge(region_a, region_b); + } else { + try { + ret = user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + return rsCoprocessorHost.preMerge(region_a, region_b); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + if (ret) { throw new IOException("Coprocessor bypassing regions " + this.region_a + " " + this.region_b + " merge."); } @@ -319,9 +379,27 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { HRegion mergedRegion = stepsBeforePONR(server, services, testing); @MetaMutationAnnotation - List metaEntries = new ArrayList(); + final List metaEntries = new ArrayList(); if (rsCoprocessorHost != null) { - if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) { + boolean ret = false; + if (user == null) { + ret = rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries); + } else { + try { + ret = user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Boolean run() throws Exception { + return rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries); + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + + if (ret) { throw new IOException("Coprocessor bypassing regions " + this.region_a + " " + this.region_b + " merge."); } @@ -685,10 +763,35 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { @SuppressWarnings("deprecation") public boolean rollback(final Server server, final RegionServerServices services) throws IOException { + if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) { + LOG.warn("Should use execute(Server, RegionServerServices, User)"); + } + return rollback(server, services, null); + } + + @Override + public boolean rollback(final Server server, + final RegionServerServices services, User user) throws IOException { assert this.mergedRegionInfo != null; // Coprocessor callback if (rsCoprocessorHost != null) { - rsCoprocessorHost.preRollBackMerge(this.region_a, this.region_b); + if (user == null) { + rsCoprocessorHost.preRollBackMerge(region_a, region_b); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + rsCoprocessorHost.preRollBackMerge(region_a, region_b); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } boolean result = true; @@ -776,7 +879,23 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction { } // Coprocessor callback if (rsCoprocessorHost != null) { - rsCoprocessorHost.postRollBackMerge(this.region_a, this.region_b); + if (user == null) { + rsCoprocessorHost.postRollBackMerge(region_a, region_b); + } else { + try { + user.getUGI().doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + rsCoprocessorHost.postRollBackMerge(region_a, region_b); + return null; + } + }); + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } } return result; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java index 65ca117..ad57b3f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java @@ -166,7 +166,7 @@ public class TestRegionServerObserver { preMergeAfterPONRCalled = true; RegionServerCoprocessorEnvironment environment = ctx.getEnvironment(); HRegionServer rs = (HRegionServer) environment.getRegionServerServices(); - rmt.stepsAfterPONR(rs, rs, this.mergedRegion); + rmt.stepsAfterPONR(rs, rs, this.mergedRegion, null); } @Override