diff --git security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index b82ad5d..eff3e76 100644
--- security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ security/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
@@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.Permission.Action;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
@@ -101,7 +104,7 @@ import com.google.common.collect.Sets;
*
*/
public class AccessController extends BaseRegionObserver
- implements MasterObserver, AccessControllerProtocol {
+ implements MasterObserver, RegionServerObserver, AccessControllerProtocol {
/**
* Represents the result of an authorization check for logging and error
* reporting.
@@ -510,18 +513,32 @@ public class AccessController extends BaseRegionObserver
}
/* ---- MasterObserver implementation ---- */
- public void start(CoprocessorEnvironment env) throws IOException {
- // if running on HMaster
+ public void start(CoprocessorEnvironment env) throws IOException {
+
+ ZooKeeperWatcher zk = null;
if (env instanceof MasterCoprocessorEnvironment) {
- MasterCoprocessorEnvironment e = (MasterCoprocessorEnvironment)env;
- this.authManager = TableAuthManager.get(
- e.getMasterServices().getZooKeeper(),
- e.getConfiguration());
+ // if running on HMaster
+ MasterCoprocessorEnvironment mEnv = (MasterCoprocessorEnvironment) env;
+ zk = mEnv.getMasterServices().getZooKeeper();
+ } else if (env instanceof RegionServerCoprocessorEnvironment) {
+ RegionServerCoprocessorEnvironment rsEnv = (RegionServerCoprocessorEnvironment) env;
+ zk = rsEnv.getRegionServerServices().getZooKeeper();
+ } else if (env instanceof RegionCoprocessorEnvironment) {
+ // if running at region
+ regionEnv = (RegionCoprocessorEnvironment) env;
+ zk = regionEnv.getRegionServerServices().getZooKeeper();
}
- // if running at region
- if (env instanceof RegionCoprocessorEnvironment) {
- regionEnv = (RegionCoprocessorEnvironment)env;
+ // If zk is null or IOException while obtaining auth manager,
+ // throw RuntimeException so that the coprocessor is unloaded.
+ if (zk != null) {
+ try {
+ this.authManager = TableAuthManager.get(zk, env.getConfiguration());
+ } catch (IOException ioe) {
+ throw new RuntimeException("Error obtaining TableAuthManager", ioe);
+ }
+ } else {
+ throw new RuntimeException("Error obtaining TableAuthManager, zk found null.");
}
}
@@ -705,29 +722,36 @@ public class AccessController extends BaseRegionObserver
/* ---- RegionObserver implementation ---- */
+
+ @Override
+ public void preOpen(ObserverContext e) throws IOException {
+ RegionCoprocessorEnvironment env = e.getEnvironment();
+ final HRegion region = env.getRegion();
+ if (region == null) {
+ LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
+ return;
+ } else {
+ HRegionInfo regionInfo = region.getRegionInfo();
+ if(isSpecialTable(regionInfo)){
+ isSystemOrSuperUser(regionEnv.getConfiguration());
+ }else{
+ requirePermission(Action.ADMIN);
+ }
+ }
+ }
@Override
public void postOpen(ObserverContext c) {
- RegionCoprocessorEnvironment e = c.getEnvironment();
- final HRegion region = e.getRegion();
+ RegionCoprocessorEnvironment env = c.getEnvironment();
+ final HRegion region = env.getRegion();
if (region == null) {
LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
return;
}
-
- try {
- this.authManager = TableAuthManager.get(
- e.getRegionServerServices().getZooKeeper(),
- regionEnv.getConfiguration());
- } catch (IOException ioe) {
- // pass along as a RuntimeException, so that the coprocessor is unloaded
- throw new RuntimeException("Error obtaining TableAuthManager", ioe);
- }
-
if (AccessControlLists.isAclRegion(region)) {
aclRegion = true;
try {
- initialize(e);
+ initialize(env);
} catch (IOException ex) {
// if we can't obtain permissions, it's better to fail
// than perform checks incorrectly
@@ -1127,4 +1151,60 @@ public class AccessController extends BaseRegionObserver
}
return tableName;
}
+
+
+ @Override
+ public void preClose(ObserverContext e, boolean abortRequested)
+ throws IOException {
+ requirePermission(Permission.Action.ADMIN);
+ }
+
+ @Override
+ public void preLockRow(ObserverContext ctx, byte[] regionName,
+ byte[] row) throws IOException {
+ requirePermission(getTableName(ctx.getEnvironment()), null, null, TablePermission.Action.WRITE,
+ TablePermission.Action.CREATE);
+ }
+
+ @Override
+ public void preUnlockRow(ObserverContext ctx, byte[] regionName,
+ long lockId) throws IOException {
+ requirePermission(getTableName(ctx.getEnvironment()), null, null, Action.WRITE, Action.CREATE);
+ }
+
+ private void isSystemOrSuperUser(Configuration conf) throws IOException{
+ User user = User.getCurrent();
+ if (user == null) {
+ throw new IOException("Unable to obtain the current user, " +
+ "authorization checks for internal operations will not work correctly!");
+ }
+
+ String currentUser = user.getShortName();
+ List superusers = Lists.asList(currentUser, conf.getStrings(
+ AccessControlLists.SUPERUSER_CONF_KEY, new String[0]));
+
+ User activeUser = getActiveUser();
+ if (!(superusers.contains(activeUser.getShortName()))) {
+ throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null")
+ + "is not system or super user.");
+ }
+ }
+
+ private boolean isSpecialTable(HRegionInfo regionInfo) {
+ byte[] tableName = regionInfo.getTableName();
+ if (tableName.equals(AccessControlLists.ACL_TABLE_NAME)
+ || tableName.equals(Bytes.toBytes("-ROOT-")) || tableName.equals(Bytes.toBytes(".META."))) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void preStopRegionServer(
+ ObserverContext env)
+ throws IOException {
+ requirePermission(Permission.Action.ADMIN);
+ }
+
}
diff --git security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
index 1a087b6..4aa0e18 100644
--- security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
+++ security/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.security.access;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.ipc.SecureRpcEngine;
import org.apache.hadoop.hbase.security.User;
@@ -32,8 +33,9 @@ public class SecureTestUtil {
conf.set("hadoop.security.authorization", "false");
conf.set("hadoop.security.authentication", "simple");
conf.set("hbase.rpc.engine", SecureRpcEngine.class.getName());
- conf.set("hbase.coprocessor.master.classes", AccessController.class.getName());
- conf.set("hbase.coprocessor.region.classes", AccessController.class.getName());
+ conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, AccessController.class.getName());
+ conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, AccessController.class.getName());
// add the process running user to superusers
String currentUser = User.getCurrent().getName();
conf.set("hbase.superuser", "admin,"+currentUser);
diff --git security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index 765f0af..aa6ce74 100644
--- security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ security/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@@ -51,9 +52,11 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.Permission.Action;
@@ -79,6 +82,8 @@ public class TestAccessController {
private static User USER_ADMIN;
// user with rw permissions
private static User USER_RW;
+ // user with rw permissions on table.
+ private static User USER_RW_ON_TABLE;
// user with read-only permissions
private static User USER_RO;
// user is table owner. will have all permissions on table
@@ -93,6 +98,7 @@ public class TestAccessController {
private static MasterCoprocessorEnvironment CP_ENV;
private static RegionCoprocessorEnvironment RCP_ENV;
+ private static RegionServerCoprocessorEnvironment RSCP_ENV;
private static AccessController ACCESS_CONTROLLER;
@BeforeClass
@@ -107,6 +113,10 @@ public class TestAccessController {
ACCESS_CONTROLLER = (AccessController) cpHost.findCoprocessor(AccessController.class.getName());
CP_ENV = cpHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER,
Coprocessor.PRIORITY_HIGHEST, 1, conf);
+ RegionServerCoprocessorHost rsHost = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)
+ .getCoprocessorHost();
+ RSCP_ENV = rsHost.createEnvironment(AccessController.class, ACCESS_CONTROLLER,
+ Coprocessor.PRIORITY_HIGHEST, 1, conf);
// Wait for the ACL table to become available
TEST_UTIL.waitTableAvailable(AccessControlLists.ACL_TABLE_NAME, 5000);
@@ -116,6 +126,7 @@ public class TestAccessController {
USER_ADMIN = User.createUserForTesting(conf, "admin2", new String[0]);
USER_RW = User.createUserForTesting(conf, "rwuser", new String[0]);
USER_RO = User.createUserForTesting(conf, "rouser", new String[0]);
+ USER_RW_ON_TABLE = User.createUserForTesting(conf, "rwuser_1", new String[0]);
USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]);
USER_CREATE = User.createUserForTesting(conf, "tbl_create", new String[0]);
USER_NONE = User.createUserForTesting(conf, "nouser", new String[0]);
@@ -148,6 +159,9 @@ public class TestAccessController {
protocol.grant(new UserPermission(Bytes.toBytes(USER_CREATE.getShortName()), TEST_TABLE, null,
Permission.Action.CREATE));
+
+ protocol.grant(new UserPermission(Bytes.toBytes(USER_RW_ON_TABLE.getShortName()), TEST_TABLE,
+ null, Permission.Action.READ, Permission.Action.WRITE));
}
@AfterClass
@@ -161,6 +175,8 @@ public class TestAccessController {
user.runAs(action);
} catch (AccessDeniedException ade) {
fail("Expected action to pass for user '" + user.getShortName() + "' but was denied");
+ } catch (UnknownRowLockException exp){
+ //expected
}
}
}
@@ -1271,4 +1287,70 @@ public class TestAccessController {
}
}
+
+ @Test
+ public void testLockAction() throws Exception {
+ PrivilegedExceptionAction lockAction = new PrivilegedExceptionAction() {
+ public Object run() throws Exception {
+ ACCESS_CONTROLLER.preLockRow(ObserverContext.createAndPrepare(RCP_ENV, null), null,
+ Bytes.toBytes("random_row"));
+ return null;
+ }
+ };
+ verifyAllowed(lockAction, SUPERUSER, USER_ADMIN, USER_OWNER, USER_CREATE, USER_RW_ON_TABLE);
+ verifyDenied(lockAction, USER_RO, USER_RW, USER_NONE);
+ }
+
+ @Test
+ public void testUnLockAction() throws Exception {
+ PrivilegedExceptionAction unLockAction = new PrivilegedExceptionAction() {
+ public Object run() throws Exception {
+ ACCESS_CONTROLLER.preUnlockRow(ObserverContext.createAndPrepare(RCP_ENV, null), null,
+ 123456);
+ return null;
+ }
+ };
+ verifyAllowed(unLockAction, SUPERUSER, USER_ADMIN, USER_OWNER, USER_RW_ON_TABLE);
+ verifyDenied(unLockAction, USER_NONE, USER_RO, USER_RW);
+ }
+
+ @Test
+ public void testStopRegionServer() throws Exception {
+ PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
+ public Object run() throws Exception {
+ ACCESS_CONTROLLER.preStopRegionServer(ObserverContext.createAndPrepare(RSCP_ENV, null));
+ return null;
+ }
+ };
+
+ verifyAllowed(action, SUPERUSER, USER_ADMIN);
+ verifyDenied(action, USER_CREATE, USER_OWNER, USER_RW, USER_RO, USER_NONE);
+ }
+
+ @Test
+ public void testOpenRegion() throws Exception {
+ PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
+ public Object run() throws Exception {
+ ACCESS_CONTROLLER.preOpen(ObserverContext.createAndPrepare(RCP_ENV, null));
+ return null;
+ }
+ };
+
+ verifyAllowed(action, SUPERUSER, USER_ADMIN);
+ verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE,USER_OWNER);
+ }
+
+ @Test
+ public void testCloseRegion() throws Exception {
+ PrivilegedExceptionAction action = new PrivilegedExceptionAction() {
+ public Object run() throws Exception {
+ ACCESS_CONTROLLER.preClose(ObserverContext.createAndPrepare(RCP_ENV, null), false);
+ return null;
+ }
+ };
+
+ verifyAllowed(action, SUPERUSER, USER_ADMIN);
+ verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE,USER_OWNER);
+ }
+
}
diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
index 5889f32..3b15f9a 100644
--- src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
+++ src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
@@ -58,14 +58,17 @@ public abstract class BaseRegionObserver implements RegionObserver {
public void stop(CoprocessorEnvironment e) throws IOException { }
@Override
- public void preOpen(ObserverContext e) { }
+ public void preOpen(ObserverContext e) throws IOException {
+ }
@Override
- public void postOpen(ObserverContext e) { }
+ public void postOpen(ObserverContext e) {
+ }
@Override
- public void preClose(ObserverContext e,
- boolean abortRequested) { }
+ public void preClose(ObserverContext c, boolean abortRequested)
+ throws IOException {
+ }
@Override
public void postClose(ObserverContext e,
@@ -320,4 +323,24 @@ public abstract class BaseRegionObserver implements RegionObserver {
List> familyPaths, boolean hasLoaded) throws IOException {
return hasLoaded;
}
+
+ @Override
+ public void preLockRow(ObserverContext ctx, byte[] regionName,
+ byte[] row) throws IOException {
+ }
+
+ @Override
+ public void preUnlockRow(ObserverContext ctx, byte[] regionName,
+ long lockId) throws IOException {
+ }
+
+ @Override
+ public void postLockRow(ObserverContext ctx, byte[] regionName,
+ byte[] row) throws IOException {
+ }
+
+ @Override
+ public void postUnlockRow(ObserverContext ctx, byte[] regionName,
+ long lockId) throws IOException {
+ }
}
diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
index 2fdaf6f..6575234 100644
--- src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
+++ src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java
@@ -56,6 +56,8 @@ import java.util.jar.JarFile;
public abstract class CoprocessorHost {
public static final String REGION_COPROCESSOR_CONF_KEY =
"hbase.coprocessor.region.classes";
+ public static final String REGIONSERVER_COPROCESSOR_CONF_KEY =
+ "hbase.coprocessor.regionserver.classes";
public static final String USER_REGION_COPROCESSOR_CONF_KEY =
"hbase.coprocessor.user.region.classes";
public static final String MASTER_COPROCESSOR_CONF_KEY =
diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 3dc2909..6d9b8e5 100644
--- src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -55,8 +55,9 @@ public interface RegionObserver extends Coprocessor {
/**
* Called before the region is reported as open to the master.
* @param c the environment provided by the region server
+ * @throws IOException if an error occurred on the coprocessor
*/
- void preOpen(final ObserverContext c);
+ void preOpen(final ObserverContext c) throws IOException;
/**
* Called after the region is reported as open to the master.
@@ -227,9 +228,10 @@ public interface RegionObserver extends Coprocessor {
* Called before the region is reported as closed to the master.
* @param c the environment provided by the region server
* @param abortRequested true if the region server is aborting
+ * @throws IOException
*/
void preClose(final ObserverContext c,
- boolean abortRequested);
+ boolean abortRequested) throws IOException;
/**
* Called after the region is reported as closed to the master.
@@ -771,4 +773,47 @@ public interface RegionObserver extends Coprocessor {
*/
boolean postBulkLoadHFile(final ObserverContext ctx,
List> familyPaths, boolean hasLoaded) throws IOException;
+
+ /**
+ * Called before locking a row.
+ *
+ * @param ctx
+ * @param regionName
+ * @param row
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ void preLockRow(final ObserverContext ctx,
+ final byte[] regionName, final byte[] row) throws IOException;
+
+ /**
+ * Called after locking a row.
+ *
+ * @param ctx
+ * @param regionName the region name
+ * @param row
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ void postLockRow(final ObserverContext ctx,
+ final byte[] regionName, final byte[] row) throws IOException;
+
+ /**
+ * Called before unlocking a row.
+ *
+ * @param ctx
+ * @param regionName
+ * @param lockId the lock id
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ void preUnlockRow(final ObserverContext ctx,
+ final byte[] regionName, final long lockId) throws IOException;
+
+ /**
+ * Called after unlocking a row.
+ * @param ctx
+ * @param regionName the region name
+ * @param lockId the lock id
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ void postUnlockRow(final ObserverContext ctx,
+ final byte[] regionName, final long lockId) throws IOException;
}
diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerCoprocessorEnvironment.java src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerCoprocessorEnvironment.java
new file mode 100644
index 0000000..23e16e8
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerCoprocessorEnvironment.java
@@ -0,0 +1,11 @@
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+public interface RegionServerCoprocessorEnvironment extends CoprocessorEnvironment{
+
+ /** @return reference to the HMaster services */
+ RegionServerServices getRegionServerServices();
+
+}
diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
new file mode 100644
index 0000000..0ccbd82
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java
@@ -0,0 +1,19 @@
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Coprocessor;
+
+public interface RegionServerObserver extends Coprocessor {
+
+
+ /**
+ * Called before stopping region server.
+ * @param env An instance of RegionServerCoprocessorEnvironment
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ void preStopRegionServer(
+ final ObserverContext env)
+ throws IOException;
+
+}
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index b3181d6..9f6ca0c 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -168,7 +168,10 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.field.MillisDurationField;
import com.google.common.base.Function;
+import com.google.common.collect.ClassToInstanceMap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.MutableClassToInstanceMap;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
@@ -363,6 +366,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
* ClusterId
*/
private ClusterId clusterId = null;
+
+ private RegionServerCoprocessorHost rsHost;
/**
* Starts a HRegionServer at the default location
@@ -1014,13 +1019,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// Init in here rather than in constructor after thread name has been set
this.metrics = new RegionServerMetrics();
this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(this);
- startServiceThreads();
+ this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
+ startServiceThreads();
LOG.info("Serving as " + this.serverNameFromMasterPOV +
", RPC listening on " + this.isa +
", sessionid=0x" +
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()));
isOnline = true;
} catch (Throwable e) {
+ LOG.warn("Exception in region server : ", e);
this.isOnline = false;
stop("Failed initialization");
throw convertThrowableToIOE(cleanup(e, "Failed init"),
@@ -1577,6 +1584,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
this.splitLogWorker = new SplitLogWorker(this.zooKeeper,
this.getConfiguration(), this.getServerName().toString());
splitLogWorker.start();
+
}
/**
@@ -1644,10 +1652,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
@Override
public void stop(final String msg) {
- this.stopped = true;
- LOG.info("STOPPED: " + msg);
- // Wakes run() if it is sleeping
- sleeper.skipSleepCycle();
+ try {
+ this.rsHost.preStop(msg);
+ this.stopped = true;
+ LOG.info("STOPPED: " + msg);
+ // Wakes run() if it is sleeping
+ sleeper.skipSleepCycle();
+ } catch (IOException exp) {
+ LOG.warn("The region server did not stop", exp);
+ }
}
public void waitForServerOnline(){
@@ -2615,6 +2628,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
+ if (region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().preLockRow(regionName, row);
+ }
Integer r = region.obtainRowLock(row);
long lockId = addRowLock(r, region);
LOG.debug("Row lock " + lockId + " explicitly acquired by client");
@@ -2676,6 +2692,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
+ if (region.getCoprocessorHost() != null) {
+ region.getCoprocessorHost().preUnLockRow(regionName, lockId);
+ }
String lockName = String.valueOf(lockId);
Integer r = rowlocks.remove(lockName);
if (r == null) {
@@ -2852,6 +2871,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
final int versionOfClosingNode)
throws IOException {
checkOpen();
+ //Check for permissions to close.
+ HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName());
+ if (actualRegion.getCoprocessorHost() !=null){
+ actualRegion.getCoprocessorHost().preClose(false);
+ }
LOG.info("Received close region: " + region.getRegionNameAsString() +
". Version of ZK closing node:" + versionOfClosingNode);
boolean hasit = this.onlineRegions.containsKey(region.getEncodedName());
@@ -2899,6 +2923,17 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
*/
protected boolean closeRegion(HRegionInfo region, final boolean abort,
final boolean zk, final int versionOfClosingNode) {
+
+ HRegion actualRegion = this.getFromOnlineRegions(region.getEncodedName());
+ if ((actualRegion != null) && (actualRegion.getCoprocessorHost() !=null)){
+ try {
+ actualRegion.getCoprocessorHost().preClose(abort);
+ } catch (IOException e) {
+ LOG.warn(e);
+ return false;
+ }
+ }
+
if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) {
LOG.warn("Received close for region we are already opening or closing; " +
region.getEncodedName());
@@ -3598,6 +3633,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
public ZooKeeperWatcher getZooKeeperWatcher() {
return this.zooKeeper;
}
+
+ public RegionServerCoprocessorHost getCoprocessorHost(){
+ return this.rsHost;
+ }
public ConcurrentSkipListMap getRegionsInTransitionInRS() {
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index 0f61539..27e194f 100644
--- src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -265,7 +264,7 @@ public class RegionCoprocessorHost
/**
* Invoked before a region open
*/
- public void preOpen() {
+ public void preOpen(){
ObserverContext ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
@@ -285,7 +284,7 @@ public class RegionCoprocessorHost
/**
* Invoked after a region open
*/
- public void postOpen() {
+ public void postOpen(){
ObserverContext ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
@@ -306,7 +305,7 @@ public class RegionCoprocessorHost
* Invoked before a region is closed
* @param abortRequested true if the server is aborting
*/
- public void preClose(boolean abortRequested) {
+ public void preClose(boolean abortRequested) throws IOException {
ObserverContext ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
@@ -314,7 +313,7 @@ public class RegionCoprocessorHost
try {
((RegionObserver)env.getInstance()).preClose(ctx, abortRequested);
} catch (Throwable e) {
- handleCoprocessorThrowableNoRethrow(env, e);
+ handleCoprocessorThrowable(env, e);
}
}
}
@@ -324,7 +323,7 @@ public class RegionCoprocessorHost
* Invoked after a region is closed
* @param abortRequested true if the server is aborting
*/
- public void postClose(boolean abortRequested) {
+ public void postClose(boolean abortRequested){
ObserverContext ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
@@ -1483,5 +1482,31 @@ public class RegionCoprocessorHost
return hasLoaded;
}
+
+ public void preLockRow(byte[] regionName, byte[] row) throws IOException {
+ ObserverContext ctx = null;
+ for (RegionEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ ((RegionObserver) env.getInstance()).preLockRow(ctx, regionName, row);
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ }
+
+ public void preUnLockRow(byte[] regionName, long lockId) throws IOException {
+ ObserverContext ctx = null;
+ for (RegionEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ ((RegionObserver) env.getInstance()).preUnlockRow(ctx, regionName, lockId);
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ }
}
diff --git src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
new file mode 100644
index 0000000..de1279b
--- /dev/null
+++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java
@@ -0,0 +1,92 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
+
+public class RegionServerCoprocessorHost extends
+ CoprocessorHost {
+
+ private RegionServerServices rsServices;
+
+ public RegionServerCoprocessorHost(RegionServerServices rsServices,
+ Configuration conf) {
+ this.rsServices = rsServices;
+ this.conf = conf;
+ // load system default cp's from configuration.
+ loadSystemCoprocessors(conf, REGIONSERVER_COPROCESSOR_CONF_KEY);
+ }
+
+ @Override
+ public RegionServerEnvironment createEnvironment(Class> implClass,
+ Coprocessor instance, int priority, int sequence, Configuration conf) {
+ return new RegionServerEnvironment(implClass, instance, priority,
+ sequence, conf, this.rsServices);
+ }
+
+ public void preStop(String message) throws IOException {
+ ObserverContext ctx = null;
+ for (RegionServerEnvironment env : coprocessors) {
+ if (env.getInstance() instanceof RegionServerObserver) {
+ ctx = ObserverContext.createAndPrepare(env, ctx);
+ ((RegionServerObserver) env.getInstance())
+ .preStopRegionServer(ctx);
+ if (ctx.shouldComplete()) {
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Coprocessor environment extension providing access to region server
+ * related services.
+ */
+ static class RegionServerEnvironment extends CoprocessorHost.Environment
+ implements RegionServerCoprocessorEnvironment {
+
+ private RegionServerServices regionServerServices;
+
+ public RegionServerEnvironment(final Class> implClass,
+ final Coprocessor impl, final int priority, final int seq,
+ final Configuration conf, final RegionServerServices services) {
+ super(impl, priority, seq, conf);
+ this.regionServerServices = services;
+ }
+
+ @Override
+ public RegionServerServices getRegionServerServices() {
+ return regionServerServices;
+ }
+ }
+
+ /**
+ * Environment priority comparator. Coprocessors are chained in sorted
+ * order.
+ */
+ static class EnvironmentPriorityComparator implements
+ Comparator {
+ public int compare(final CoprocessorEnvironment env1,
+ final CoprocessorEnvironment env2) {
+ if (env1.getPriority() < env2.getPriority()) {
+ return -1;
+ } else if (env1.getPriority() > env2.getPriority()) {
+ return 1;
+ }
+ if (env1.getLoadSequence() < env2.getLoadSequence()) {
+ return -1;
+ } else if (env1.getLoadSequence() > env2.getLoadSequence()) {
+ return 1;
+ }
+ return 0;
+ }
+ }
+
+}