diff --git security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index b82ad5d..90324f3 100644 --- security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.MasterObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; @@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.Permission.Action; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; @@ -101,7 +104,7 @@ import com.google.common.collect.Sets; *

*/ public class AccessController extends BaseRegionObserver - implements MasterObserver, AccessControllerProtocol { + implements MasterObserver, RegionServerObserver, AccessControllerProtocol { /** * Represents the result of an authorization check for logging and error * reporting. @@ -510,18 +513,32 @@ public class AccessController extends BaseRegionObserver } /* ---- MasterObserver implementation ---- */ - public void start(CoprocessorEnvironment env) throws IOException { - // if running on HMaster + public void start(CoprocessorEnvironment env) throws IOException { + + ZooKeeperWatcher zk = null; if (env instanceof MasterCoprocessorEnvironment) { - MasterCoprocessorEnvironment e = (MasterCoprocessorEnvironment)env; - this.authManager = TableAuthManager.get( - e.getMasterServices().getZooKeeper(), - e.getConfiguration()); + // if running on HMaster + MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env; + zk = mEnv.getMasterServices().getZooKeeper(); + } else if (env instanceof RegionServerCoprocessorEnvironment) { + RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env; + zk = rsEnv.getRegionServerServices().getZooKeeper(); + } else if (env instanceof RegionCoprocessorEnvironment) { + // if running at region + regionEnv = (RegionCoprocessorEnvironment) env; + zk = regionEnv.getRegionServerServices().getZooKeeper(); } - // if running at region - if (env instanceof RegionCoprocessorEnvironment) { - regionEnv = (RegionCoprocessorEnvironment)env; + // If zk is null or IOException while obtaining auth manager, + // throw RuntimeException so that the coprocessor is unloaded. + if (zk != null) { + try { + this.authManager = TableAuthManager.get(zk, env.getConfiguration()); + } catch (IOException ioe) { + throw new RuntimeException("Error obtaining TableAuthManager", ioe); + } + } else { + throw new RuntimeException("Error obtaining TableAuthManager, zk found null."); } } @@ -707,27 +724,34 @@ public class AccessController extends BaseRegionObserver /* ---- RegionObserver implementation ---- */ @Override - public void postOpen(ObserverContext c) { - RegionCoprocessorEnvironment e = c.getEnvironment(); - final HRegion region = e.getRegion(); + public void preOpen(ObserverContext e) throws IOException { + RegionCoprocessorEnvironment env = e.getEnvironment(); + final HRegion region = env.getRegion(); if (region == null) { - LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()"); + LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()"); return; + } else { + HRegionInfo regionInfo = region.getRegionInfo(); + if (isSpecialTable(regionInfo)) { + isSystemOrSuperUser(regionEnv.getConfiguration()); + } else { + requirePermission(Action.ADMIN); + } } + } - try { - this.authManager = TableAuthManager.get( - e.getRegionServerServices().getZooKeeper(), - regionEnv.getConfiguration()); - } catch (IOException ioe) { - // pass along as a RuntimeException, so that the coprocessor is unloaded - throw new RuntimeException("Error obtaining TableAuthManager", ioe); + @Override + public void postOpen(ObserverContext c) { + RegionCoprocessorEnvironment env = c.getEnvironment(); + final HRegion region = env.getRegion(); + if (region == null) { + LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()"); + return; } - if (AccessControlLists.isAclRegion(region)) { aclRegion = true; try { - initialize(e); + initialize(env); } catch (IOException ex) { // if we can't obtain permissions, it's better to fail // than perform checks incorrectly @@ -1127,4 +1151,55 @@ public class AccessController extends BaseRegionObserver } return tableName; } + + + @Override + public void preClose(ObserverContext e, boolean abortRequested) + throws IOException { + requirePermission(Permission.Action.ADMIN); + } + + @Override + public void preLockRow(ObserverContext ctx, byte[] regionName, + byte[] row) throws IOException { + requirePermission(getTableName(ctx.getEnvironment()), null, null, TablePermission.Action.WRITE, + TablePermission.Action.CREATE); + } + + @Override + public void preUnlockRow(ObserverContext ctx, byte[] regionName, + long lockId) throws IOException { + requirePermission(getTableName(ctx.getEnvironment()), null, null, Action.WRITE, Action.CREATE); + } + + private void isSystemOrSuperUser(Configuration conf) throws IOException { + User user = User.getCurrent(); + if (user == null) { + throw new IOException("Unable to obtain the current user, " + + "authorization checks for internal operations will not work correctly!"); + } + + String currentUser = user.getShortName(); + List superusers = Lists.asList(currentUser, + conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new String[0])); + + User activeUser = getActiveUser(); + if (!(superusers.contains(activeUser.getShortName()))) { + throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null") + + "is not system or super user."); + } + } + + private boolean isSpecialTable(HRegionInfo regionInfo) { + byte[] tableName = regionInfo.getTableName(); + return tableName.equals(AccessControlLists.ACL_TABLE_NAME) + || tableName.equals(Bytes.toBytes("-ROOT-")) || tableName.equals(Bytes.toBytes(".META.")); + } + + @Override + public void preStopRegionServer(ObserverContext env) + throws IOException { + requirePermission(Permission.Action.ADMIN); + } + } diff --git security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java index 1a087b6..4aa0e18 100644 --- security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java +++ security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.security.access; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.ipc.SecureRpcEngine; import org.apache.hadoop.hbase.security.User; @@ -32,8 +33,9 @@ public class SecureTestUtil { conf.set("hadoop.security.authorization", "false"); conf.set("hadoop.security.authentication", "simple"); conf.set("hbase.rpc.engine", SecureRpcEngine.class.getName()); - conf.set("hbase.coprocessor.master.classes", AccessController.class.getName()); - conf.set("hbase.coprocessor.region.classes", AccessController.class.getName()); + conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName()); + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName()); + conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, AccessController.class.getName()); // add the process running user to superusers String currentUser = User.getCurrent().getName(); conf.set("hbase.superuser", "admin,"+currentUser); diff --git security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index 765f0af..ad6dda4 100644 --- security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.UnknownRowLockException; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -51,9 +52,11 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; +import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.Permission.Action; @@ -79,6 +82,8 @@ public class TestAccessController { private static User USER_ADMIN; // user with rw permissions private static User USER_RW; + // user with rw permissions on table. + private static User USER_RW_ON_TABLE; // user with read-only permissions private static User USER_RO; // user is table owner. will have all permissions on table @@ -93,6 +98,7 @@ public class TestAccessController { private static MasterCoprocessorEnvironment CP_ENV; private static RegionCoprocessorEnvironment RCP_ENV; + private static RegionServerCoprocessorEnvironment RSCP_ENV; private static AccessController ACCESS_CONTROLLER; @BeforeClass @@ -107,6 +113,10 @@ public class TestAccessController { ACCESS_CONTROLLER = (AccessController) cpHost.findCoprocessor(AccessController.class.getName()); CP_ENV = cpHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER, Coprocessor.PRIORITY_HIGHEST, 1, conf); + RegionServerCoprocessorHost rsHost = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0) + .getCoprocessorHost(); + RSCP_ENV = rsHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER, + Coprocessor.PRIORITY_HIGHEST, 1, conf); // Wait for the ACL table to become available TEST_UTIL.waitTableAvailable(AccessControlLists.ACL_TABLE_NAME, 5000); @@ -116,6 +126,7 @@ public class TestAccessController { USER_ADMIN = User.createUserForTesting(conf, "admin2", new String[0]); USER_RW = User.createUserForTesting(conf, "rwuser", new String[0]); USER_RO = User.createUserForTesting(conf, "rouser", new String[0]); + USER_RW_ON_TABLE = User.createUserForTesting(conf, "rwuser_1", new String[0]); USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]); USER_CREATE = User.createUserForTesting(conf, "tbl_create", new String[0]); USER_NONE = User.createUserForTesting(conf, "nouser", new String[0]); @@ -148,6 +159,9 @@ public class TestAccessController { protocol.grant(new UserPermission(Bytes.toBytes(USER_CREATE.getShortName()), TEST_TABLE, null, Permission.Action.CREATE)); + + protocol.grant(new UserPermission(Bytes.toBytes(USER_RW_ON_TABLE.getShortName()), TEST_TABLE, + null, Permission.Action.READ, Permission.Action.WRITE)); } @AfterClass @@ -161,6 +175,8 @@ public class TestAccessController { user.runAs(action); } catch (AccessDeniedException ade) { fail("Expected action to pass for user '" + user.getShortName() + "' but was denied"); + } catch (UnknownRowLockException exp){ + //expected } } } @@ -1271,4 +1287,70 @@ public class TestAccessController { } } + + @Test + public void testLockAction() throws Exception { + PrivilegedExceptionAction lockAction = new PrivilegedExceptionAction() { + public Object run() throws Exception { + ACCESS_CONTROLLER.preLockRow(ObserverContext.createAndPrepare(RCP_ENV, null), null, + Bytes.toBytes("random_row")); + return null; + } + }; + verifyAllowed(lockAction, SUPERUSER, USER_ADMIN, USER_OWNER, USER_CREATE, USER_RW_ON_TABLE); + verifyDenied(lockAction, USER_RO, USER_RW, USER_NONE); + } + + @Test + public void testUnLockAction() throws Exception { + PrivilegedExceptionAction unLockAction = new PrivilegedExceptionAction() { + public Object run() throws Exception { + ACCESS_CONTROLLER.preUnlockRow(ObserverContext.createAndPrepare(RCP_ENV, null), null, + 123456); + return null; + } + }; + verifyAllowed(unLockAction, SUPERUSER, USER_ADMIN, USER_OWNER, USER_RW_ON_TABLE); + verifyDenied(unLockAction, USER_NONE, USER_RO, USER_RW); + } + + @Test + public void testStopRegionServer() throws Exception { + PrivilegedExceptionAction action = new PrivilegedExceptionAction() { + public Object run() throws Exception { + ACCESS_CONTROLLER.preStopRegionServer(ObserverContext.createAndPrepare(RSCP_ENV, null)); + return null; + } + }; + + verifyAllowed(action, SUPERUSER, USER_ADMIN); + verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO, USER_NONE); + } + + @Test + public void testOpenRegion() throws Exception { + PrivilegedExceptionAction action = new PrivilegedExceptionAction() { + public Object run() throws Exception { + ACCESS_CONTROLLER.preOpen(ObserverContext.createAndPrepare(RCP_ENV, null)); + return null; + } + }; + + verifyAllowed(action, SUPERUSER, USER_ADMIN); + verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER); + } + + @Test + public void testCloseRegion() throws Exception { + PrivilegedExceptionAction action = new PrivilegedExceptionAction() { + public Object run() throws Exception { + ACCESS_CONTROLLER.preClose(ObserverContext.createAndPrepare(RCP_ENV, null), false); + return null; + } + }; + + verifyAllowed(action, SUPERUSER, USER_ADMIN); + verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER); + } + } diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index 5889f32..abc972c 100644 --- src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -58,14 +58,14 @@ public abstract class BaseRegionObserver implements RegionObserver { public void stop(CoprocessorEnvironment e) throws IOException { } @Override - public void preOpen(ObserverContext e) { } + public void preOpen(ObserverContext e) throws IOException { } @Override public void postOpen(ObserverContext e) { } @Override - public void preClose(ObserverContext e, - boolean abortRequested) { } + public void preClose(ObserverContext c, boolean abortRequested) + throws IOException { } @Override public void postClose(ObserverContext e, @@ -320,4 +320,20 @@ public abstract class BaseRegionObserver implements RegionObserver { List> familyPaths, boolean hasLoaded) throws IOException { return hasLoaded; } + + @Override + public void preLockRow(ObserverContext ctx, byte[] regionName, + byte[] row) throws IOException { } + + @Override + public void preUnlockRow(ObserverContext ctx, byte[] regionName, + long lockId) throws IOException { } + + @Override + public void postLockRow(ObserverContext ctx, byte[] regionName, + byte[] row) throws IOException { } + + @Override + public void postUnlockRow(ObserverContext ctx, byte[] regionName, + long lockId) throws IOException { } } diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index 2fdaf6f..6575234 100644 --- src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -56,6 +56,8 @@ import java.util.jar.JarFile; public abstract class CoprocessorHost { public static final String REGION_COPROCESSOR_CONF_KEY = "hbase.coprocessor.region.classes"; + public static final String REGIONSERVER_COPROCESSOR_CONF_KEY = + "hbase.coprocessor.regionserver.classes"; public static final String USER_REGION_COPROCESSOR_CONF_KEY = "hbase.coprocessor.user.region.classes"; public static final String MASTER_COPROCESSOR_CONF_KEY = diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 3dc2909..6d9b8e5 100644 --- src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -55,8 +55,9 @@ public interface RegionObserver extends Coprocessor { /** * Called before the region is reported as open to the master. * @param c the environment provided by the region server + * @throws IOException if an error occurred on the coprocessor */ - void preOpen(final ObserverContext c); + void preOpen(final ObserverContext c) throws IOException; /** * Called after the region is reported as open to the master. @@ -227,9 +228,10 @@ public interface RegionObserver extends Coprocessor { * Called before the region is reported as closed to the master. * @param c the environment provided by the region server * @param abortRequested true if the region server is aborting + * @throws IOException */ void preClose(final ObserverContext c, - boolean abortRequested); + boolean abortRequested) throws IOException; /** * Called after the region is reported as closed to the master. @@ -771,4 +773,47 @@ public interface RegionObserver extends Coprocessor { */ boolean postBulkLoadHFile(final ObserverContext ctx, List> familyPaths, boolean hasLoaded) throws IOException; + + /** + * Called before locking a row. + * + * @param ctx + * @param regionName + * @param row + * @throws IOException Signals that an I/O exception has occurred. + */ + void preLockRow(final ObserverContext ctx, + final byte[] regionName, final byte[] row) throws IOException; + + /** + * Called after locking a row. + * + * @param ctx + * @param regionName the region name + * @param row + * @throws IOException Signals that an I/O exception has occurred. + */ + void postLockRow(final ObserverContext ctx, + final byte[] regionName, final byte[] row) throws IOException; + + /** + * Called before unlocking a row. + * + * @param ctx + * @param regionName + * @param lockId the lock id + * @throws IOException Signals that an I/O exception has occurred. + */ + void preUnlockRow(final ObserverContext ctx, + final byte[] regionName, final long lockId) throws IOException; + + /** + * Called after unlocking a row. + * @param ctx + * @param regionName the region name + * @param lockId the lock id + * @throws IOException Signals that an I/O exception has occurred. + */ + void postUnlockRow(final ObserverContext ctx, + final byte[] regionName, final long lockId) throws IOException; } diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerCoprocessorEnvironment.java src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerCoprocessorEnvironment.java new file mode 100644 index 0000000..1bb152a --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerCoprocessorEnvironment.java @@ -0,0 +1,11 @@ +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +public interface RegionServerCoprocessorEnvironment extends CoprocessorEnvironment { + + /** @return reference to the HMaster services */ + RegionServerServices getRegionServerServices(); + +} diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java new file mode 100644 index 0000000..d2a14d8 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java @@ -0,0 +1,17 @@ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Coprocessor; + +public interface RegionServerObserver extends Coprocessor { + + /** + * Called before stopping region server. + * @param env An instance of RegionServerCoprocessorEnvironment + * @throws IOException Signals that an I/O exception has occurred. + */ + void preStopRegionServer(final ObserverContext env) + throws IOException; + +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5b17faa..d4fcbb2 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -169,7 +169,10 @@ import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.field.MillisDurationField; import com.google.common.base.Function; +import com.google.common.collect.ClassToInstanceMap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.MutableClassToInstanceMap; /** * HRegionServer makes a set of HRegions available to clients. It checks in with @@ -235,7 +238,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // Server to handle client requests. Default access so can be accessed by // unit tests. RpcServer rpcServer; - + // Server to handle client requests. private HBaseServer server; @@ -365,6 +368,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, */ private ClusterId clusterId = null; + private RegionServerCoprocessorHost rsHost; + /** * Starts a HRegionServer at the default location * @@ -1015,6 +1020,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // Init in here rather than in constructor after thread name has been set this.metrics = new RegionServerMetrics(); this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(this); + this.rsHost = new RegionServerCoprocessorHost(this, this.conf); startServiceThreads(); LOG.info("Serving as " + this.serverNameFromMasterPOV + ", RPC listening on " + this.isa + @@ -1022,6 +1028,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId())); isOnline = true; } catch (Throwable e) { + LOG.warn("Exception in region server : ", e); this.isOnline = false; stop("Failed initialization"); throw convertThrowableToIOE(cleanup(e, "Failed init"), @@ -1577,6 +1584,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, this.splitLogWorker = new SplitLogWorker(this.zooKeeper, this.getConfiguration(), this.getServerName().toString()); splitLogWorker.start(); + } /** @@ -1644,10 +1652,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, @Override public void stop(final String msg) { - this.stopped = true; - LOG.info("STOPPED: " + msg); - // Wakes run() if it is sleeping - sleeper.skipSleepCycle(); + try { + this.rsHost.preStop(msg); + this.stopped = true; + LOG.info("STOPPED: " + msg); + // Wakes run() if it is sleeping + sleeper.skipSleepCycle(); + } catch (IOException exp) { + LOG.warn("The region server did not stop", exp); + } } public void waitForServerOnline(){ @@ -2615,6 +2628,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, requestCount.incrementAndGet(); try { HRegion region = getRegion(regionName); + if (region.getCoprocessorHost() != null) { + region.getCoprocessorHost().preLockRow(regionName, row); + } Integer r = region.obtainRowLock(row); long lockId = addRowLock(r, region); LOG.debug("Row lock " + lockId + " explicitly acquired by client"); @@ -2676,6 +2692,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, requestCount.incrementAndGet(); try { HRegion region = getRegion(regionName); + if (region.getCoprocessorHost() != null) { + region.getCoprocessorHost().preUnLockRow(regionName, lockId); + } String lockName = String.valueOf(lockId); Integer r = rowlocks.remove(lockName); if (r == null) { @@ -2852,6 +2871,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, final int versionOfClosingNode) throws IOException { checkOpen(); + //Check for permissions to close. + HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName()); + if (actualRegion.getCoprocessorHost() != null) { + actualRegion.getCoprocessorHost().preClose(false); + } LOG.info("Received close region: " + region.getRegionNameAsString() + ". Version of ZK closing node:" + versionOfClosingNode); boolean hasit = this.onlineRegions.containsKey(region.getEncodedName()); @@ -2899,6 +2923,17 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, */ protected boolean closeRegion(HRegionInfo region, final boolean abort, final boolean zk, final int versionOfClosingNode) { + + HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName()); + if ((actualRegion != null) && (actualRegion.getCoprocessorHost() !=null)){ + try { + actualRegion.getCoprocessorHost().preClose(abort); + } catch (IOException e) { + LOG.warn(e); + return false; + } + } + if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) { LOG.warn("Received close for region we are already opening or closing; " + region.getEncodedName()); @@ -3599,6 +3634,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, return this.zooKeeper; } + public RegionServerCoprocessorHost getCoprocessorHost(){ + return this.rsHost; + } + public ConcurrentSkipListMap getRegionsInTransitionInRS() { return this.regionsInTransitionInRS; diff --git src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 0f61539..27e194f 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -37,7 +37,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; @@ -265,7 +264,7 @@ public class RegionCoprocessorHost /** * Invoked before a region open */ - public void preOpen() { + public void preOpen(){ ObserverContext ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { @@ -285,7 +284,7 @@ public class RegionCoprocessorHost /** * Invoked after a region open */ - public void postOpen() { + public void postOpen(){ ObserverContext ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { @@ -306,7 +305,7 @@ public class RegionCoprocessorHost * Invoked before a region is closed * @param abortRequested true if the server is aborting */ - public void preClose(boolean abortRequested) { + public void preClose(boolean abortRequested) throws IOException { ObserverContext ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { @@ -314,7 +313,7 @@ public class RegionCoprocessorHost try { ((RegionObserver)env.getInstance()).preClose(ctx, abortRequested); } catch (Throwable e) { - handleCoprocessorThrowableNoRethrow(env, e); + handleCoprocessorThrowable(env, e); } } } @@ -324,7 +323,7 @@ public class RegionCoprocessorHost * Invoked after a region is closed * @param abortRequested true if the server is aborting */ - public void postClose(boolean abortRequested) { + public void postClose(boolean abortRequested){ ObserverContext ctx = null; for (RegionEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionObserver) { @@ -1483,5 +1482,31 @@ public class RegionCoprocessorHost return hasLoaded; } + + public void preLockRow(byte[] regionName, byte[] row) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + ((RegionObserver) env.getInstance()).preLockRow(ctx, regionName, row); + if (ctx.shouldComplete()) { + break; + } + } + } + } + + public void preUnLockRow(byte[] regionName, long lockId) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + ((RegionObserver) env.getInstance()).preUnlockRow(ctx, regionName, lockId); + if (ctx.shouldComplete()) { + break; + } + } + } + } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java new file mode 100644 index 0000000..f395d3b --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java @@ -0,0 +1,86 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Comparator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionServerObserver; + +public class RegionServerCoprocessorHost extends + CoprocessorHost { + + private RegionServerServices rsServices; + + public RegionServerCoprocessorHost(RegionServerServices rsServices, Configuration conf) { + this.rsServices = rsServices; + this.conf = conf; + // load system default cp's from configuration. + loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY); + } + + @Override + public RegionServerEnvironment createEnvironment(Class implClass, Coprocessor instance, + int priority, int sequence, Configuration conf) { + return new RegionServerEnvironment(implClass, instance, priority, sequence, conf, + this.rsServices); + } + + public void preStop(String message) throws IOException { + ObserverContext ctx = null; + for (RegionServerEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionServerObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + ((RegionServerObserver) env.getInstance()).preStopRegionServer(ctx); + if (ctx.shouldComplete()) { + break; + } + } + } + } + + /** + * Coprocessor environment extension providing access to region server related services. + */ + static class RegionServerEnvironment extends CoprocessorHost.Environment implements + RegionServerCoprocessorEnvironment { + + private RegionServerServices regionServerServices; + + public RegionServerEnvironment(final Class implClass, final Coprocessor impl, + final int priority, final int seq, final Configuration conf, + final RegionServerServices services) { + super(impl, priority, seq, conf); + this.regionServerServices = services; + } + + @Override + public RegionServerServices getRegionServerServices() { + return regionServerServices; + } + } + + /** + * Environment priority comparator. Coprocessors are chained in sorted order. + */ + static class EnvironmentPriorityComparator implements Comparator { + public int compare(final CoprocessorEnvironment env1, final CoprocessorEnvironment env2) { + if (env1.getPriority() < env2.getPriority()) { + return -1; + } else if (env1.getPriority() > env2.getPriority()) { + return 1; + } + if (env1.getLoadSequence() < env2.getLoadSequence()) { + return -1; + } else if (env1.getLoadSequence() > env2.getLoadSequence()) { + return 1; + } + return 0; + } + } + +}