From a478c01c38d9a562fe059fc954653d33eda8247c Mon Sep 17 00:00:00 2001 From: Mikhail Antonov Date: Wed, 8 Apr 2015 18:31:32 -0700 Subject: [PATCH] HBASE-13375 Provide HBase superuser higher priority over other users in the RPC handling --- .../apache/hadoop/hbase/ipc/PriorityFunction.java | 4 +- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 2 +- .../AnnotationReadingPriorityFunction.java | 45 +++++++++++++++++++++- .../hadoop/hbase/regionserver/RSRpcServices.java | 5 ++- .../org/apache/hadoop/hbase/client/TestAdmin2.java | 2 +- .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 11 ++++-- .../hadoop/hbase/regionserver/TestPriorityRpc.java | 37 +++++++++++++++--- .../hadoop/hbase/regionserver/TestQosFunction.java | 4 +- .../apache/hadoop/hbase/util/TestHBaseFsck.java | 3 +- 9 files changed, 95 insertions(+), 18 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java index e323e78..765e83e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.security.UserGroupInformation; /** * Function to figure priority of incoming request. @@ -34,9 +35,10 @@ public interface PriorityFunction { * The returned value is mainly used to select the dispatch queue. * @param header * @param param + * @param remoteUgi * @return Priority of this request. */ - int getPriority(RequestHeader header, Message param); + int getPriority(RequestHeader header, Message param, UserGroupInformation remoteUgi); /** * Returns the deadline of the specified request. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index d8ae3ba..4455e17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -192,7 +192,7 @@ public class SimpleRpcScheduler extends RpcScheduler { @Override public void dispatch(CallRunner callTask) throws InterruptedException { RpcServer.Call call = callTask.getCall(); - int level = priority.getPriority(call.getHeader(), call.param); + int level = priority.getPriority(call.getHeader(), call.param, call.getRemoteUser()); if (priorityExecutor != null && level > highPriorityLevel) { priorityExecutor.dispatch(callTask); } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java index 29228db..b1c5050 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.lang.reflect.Method; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import org.apache.commons.logging.Log; @@ -44,6 +46,8 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices.QosPriority; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Message; import com.google.protobuf.TextFormat; +import org.apache.hadoop.hbase.security.visibility.VisibilityUtils; +import org.apache.hadoop.security.UserGroupInformation; /** @@ -100,6 +104,12 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { private final float scanVirtualTimeWeight; + // lists of super users and super groups, used to route rpc calls made by + // superusers through high-priority (ADMIN_QOS) thread pool. + // made protected for tests + protected final HashSet superUsers; + protected final HashSet superGroups; + AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) { Map qosMap = new HashMap(); for (Method m : RSRpcServices.class.getMethods()) { @@ -132,6 +142,15 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { Configuration conf = rpcServices.getConfiguration(); scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f); + + try { + superUsers = new HashSet<>(VisibilityUtils.getSystemAndSuperUsers( + rpcServices.getConfiguration()).getFirst()); + superGroups =new HashSet<>(VisibilityUtils.getSystemAndSuperUsers( + rpcServices.getConfiguration()).getSecond()); + } catch (IOException e) { + throw new RuntimeException(e); + } } private String capitalize(final String s) { @@ -149,12 +168,19 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { * NORMAL_QOS (user requests). */ @Override - public int getPriority(RequestHeader header, Message param) { + public int getPriority(RequestHeader header, Message param, + UserGroupInformation remoteUgi) { String methodName = header.getMethodName(); Integer priorityByAnnotation = annotatedQos.get(methodName); if (priorityByAnnotation != null) { return priorityByAnnotation; } + + // all requests executed by super users have high QoS + if (isExecutedBySuperUser(remoteUgi)) { + return HConstants.ADMIN_QOS; + } + if (param == null) { return HConstants.NORMAL_QOS; } @@ -238,4 +264,21 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { void setRegionServer(final HRegionServer hrs) { this.rpcServices = hrs.getRSRpcServices(); } + + /** + * @param remoteUgi ugi of user running request + * @return true if user is super user, false otherwise + */ + private boolean isExecutedBySuperUser(UserGroupInformation remoteUgi) { + if (superUsers.contains(remoteUgi.getShortUserName())) { + return true; + } + + for (String group : remoteUgi.getGroupNames()) { + if (superGroups.contains(group)) { + return true; + } + } + return false; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 10e39a1..f42cbc2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -169,6 +169,7 @@ import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.net.DNS; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.zookeeper.KeeperException; import com.google.protobuf.ByteString; @@ -953,8 +954,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } @Override - public int getPriority(RequestHeader header, Message param) { - return priority.getPriority(header, param); + public int getPriority(RequestHeader header, Message param, UserGroupInformation remoteUgi) { + return priority.getPriority(header, param, remoteUgi); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java index a1da440..c5f7ba0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.constraint.ConstraintException; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -87,6 +86,7 @@ public class TestAdmin2 { TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); TEST_UTIL.getConfiguration().setBoolean( "hbase.master.enabletable.roundrobin", true); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.metahandler.count", 20); TEST_UTIL.startMiniCluster(3); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 11ac43f..3015784 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -125,7 +126,8 @@ public class TestSimpleRpcScheduler { scheduler.init(CONTEXT); scheduler.start(); for (CallRunner task : tasks) { - when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(), (Message) anyObject())) + when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(), + (Message) anyObject(), (UserGroupInformation) anyObject())) .thenReturn(qos.get(task)); scheduler.dispatch(task); } @@ -157,7 +159,8 @@ public class TestSimpleRpcScheduler { schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType); PriorityFunction priority = mock(PriorityFunction.class); - when(priority.getPriority(any(RequestHeader.class), any(Message.class))) + when(priority.getPriority(any(RequestHeader.class), + any(Message.class), any(UserGroupInformation.class))) .thenReturn(HConstants.NORMAL_QOS); RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, @@ -235,8 +238,8 @@ public class TestSimpleRpcScheduler { schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); PriorityFunction priority = mock(PriorityFunction.class); - when(priority.getPriority(any(RequestHeader.class), any(Message.class))) - .thenReturn(HConstants.NORMAL_QOS); + when(priority.getPriority(any(RequestHeader.class), any(Message.class), + any(UserGroupInformation.class))).thenReturn(HConstants.NORMAL_QOS); RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, HConstants.QOS_THRESHOLD); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java index dc18408..4f8c707 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -95,7 +96,8 @@ public class TestPriorityRpc { Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(true); // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest)); + assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest, + UserGroupInformation.createRemoteUser("someuser"))); } @Test @@ -108,7 +110,29 @@ public class TestPriorityRpc { headerBuilder.setMethodName("foo"); RequestHeader header = headerBuilder.build(); PriorityFunction qosFunc = regionServer.rpcServices.getPriority(); - assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null)); + assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null, + UserGroupInformation.createRemoteUser("someuser"))); + } + + @Test + public void testQosFunctionForRequestCalledBySuperUser() throws IOException { + RequestHeader.Builder headerBuilder = RequestHeader.newBuilder(); + headerBuilder.setMethodName("foo"); + RequestHeader header = headerBuilder.build(); + PriorityFunction qosFunc = regionServer.rpcServices.getPriority(); + + //test superusers + ((AnnotationReadingPriorityFunction) qosFunc).superUsers.add("samplesuperuser"); + UserGroupInformation remoteUgi = Mockito.mock(UserGroupInformation.class); + Mockito.when(remoteUgi.getShortUserName()).thenReturn("samplesuperuser"); + Mockito.when(remoteUgi.getGroupNames()).thenReturn(new String[]{}); + assertEquals(HConstants.ADMIN_QOS, qosFunc.getPriority(header, null, remoteUgi)); + + //test supergroups + ((AnnotationReadingPriorityFunction) qosFunc).superGroups.add("samplesupergroup"); + Mockito.when(remoteUgi.getShortUserName()).thenReturn("regularuser"); + Mockito.when(remoteUgi.getGroupNames()).thenReturn(new String[]{"samplesupergroup"}); + assertEquals(HConstants.ADMIN_QOS, qosFunc.getPriority(header, null, remoteUgi)); } @Test @@ -130,7 +154,8 @@ public class TestPriorityRpc { Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(false); // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - int qos = priority.getPriority(header, scanRequest); + int qos = priority.getPriority(header, scanRequest, + UserGroupInformation.createRemoteUser("someuser")); assertTrue ("" + qos, qos == HConstants.NORMAL_QOS); //build a scan request with scannerID @@ -148,10 +173,12 @@ public class TestPriorityRpc { // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, scanRequest)); + assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, scanRequest, + UserGroupInformation.createRemoteUser("someuser"))); //the same as above but with non-meta region Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(false); - assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest)); + assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest, + UserGroupInformation.createRemoteUser("someuser"))); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java index 2b2ecda..7529c5d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; @@ -64,6 +65,7 @@ public class TestQosFunction { final AnnotationReadingPriorityFunction qosf, final Message param) { RequestHeader.Builder builder = RequestHeader.newBuilder(); builder.setMethodName(methodName); - assertEquals(methodName, expected, qosf.getPriority(builder.build(), param)); + assertEquals(methodName, expected, qosf.getPriority(builder.build(), param, + UserGroupInformation.createRemoteUser("someuser"))); } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java index 48d0bfc..4bfcf56 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java @@ -103,7 +103,6 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.SplitTransactionFactory; import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl; import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction; -import org.apache.hadoop.hbase.security.access.AccessControlClient; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; @@ -160,7 +159,7 @@ public class TestHBaseFsck { MasterSyncObserver.class.getName()); conf.setInt("hbase.regionserver.handler.count", 2); - conf.setInt("hbase.regionserver.metahandler.count", 2); + conf.setInt("hbase.regionserver.metahandler.count", 40); conf.setInt("hbase.htable.threads.max", POOL_SIZE); conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE); -- 1.9.5 (Apple Git-50.3)