From 5de18fd6cf0aa09f7d7c7dba63dbe551b5ac97f8 Mon Sep 17 00:00:00 2001 From: Mikhail Antonov Date: Fri, 3 Apr 2015 02:22:44 -0700 Subject: [PATCH] HBASE-13375 Provide HBase superuser higher priority over other users in the RPC handling --- .../apache/hadoop/hbase/ipc/PriorityFunction.java | 4 +- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 2 +- .../AnnotationReadingPriorityFunction.java | 48 +++++++++++++++++++++- .../hadoop/hbase/regionserver/RSRpcServices.java | 5 ++- .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 11 +++-- .../hadoop/hbase/regionserver/TestPriorityRpc.java | 24 ++++++++--- .../hadoop/hbase/regionserver/TestQosFunction.java | 2 +- 7 files changed, 81 insertions(+), 15 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java index e323e78..765e83e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.security.UserGroupInformation; /** * Function to figure priority of incoming request. @@ -34,9 +35,10 @@ public interface PriorityFunction { * The returned value is mainly used to select the dispatch queue. * @param header * @param param + * @param remoteUgi * @return Priority of this request. */ - int getPriority(RequestHeader header, Message param); + int getPriority(RequestHeader header, Message param, UserGroupInformation remoteUgi); /** * Returns the deadline of the specified request. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index d8ae3ba..4455e17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -192,7 +192,7 @@ public class SimpleRpcScheduler extends RpcScheduler { @Override public void dispatch(CallRunner callTask) throws InterruptedException { RpcServer.Call call = callTask.getCall(); - int level = priority.getPriority(call.getHeader(), call.param); + int level = priority.getPriority(call.getHeader(), call.param, call.getRemoteUser()); if (priorityExecutor != null && level > highPriorityLevel) { priorityExecutor.dispatch(callTask); } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java index 29228db..2a70166 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.lang.reflect.Method; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; @@ -44,6 +46,8 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices.QosPriority; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Message; import com.google.protobuf.TextFormat; +import org.apache.hadoop.hbase.security.visibility.VisibilityUtils; +import org.apache.hadoop.security.UserGroupInformation; /** @@ -149,12 +153,19 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { * NORMAL_QOS (user requests). */ @Override - public int getPriority(RequestHeader header, Message param) { + public int getPriority(RequestHeader header, Message param, + UserGroupInformation remoteUgi) { String methodName = header.getMethodName(); Integer priorityByAnnotation = annotatedQos.get(methodName); if (priorityByAnnotation != null) { return priorityByAnnotation; } + + // all requests executed by super users have high QoS + if (isExecutedBySuperUser(remoteUgi)) { + return HConstants.ADMIN_QOS; + } + if (param == null) { return HConstants.NORMAL_QOS; } @@ -238,4 +249,39 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { void setRegionServer(final HRegionServer hrs) { this.rpcServices = hrs.getRSRpcServices(); } + + /** + * @param remoteUgi ugi of user running request + * @return true if user is super user, false otherwise + */ + private boolean isExecutedBySuperUser(UserGroupInformation remoteUgi) { + if (remoteUgi == null) { + return false; + } + try { + List superUsers = + VisibilityUtils.getSystemAndSuperUsers(rpcServices.getConfiguration()).getFirst(); + List superGroups = + VisibilityUtils.getSystemAndSuperUsers(rpcServices.getConfiguration()).getSecond(); + + if (superUsers != null && superUsers.contains(remoteUgi.getShortUserName())) { + return true; + } + + String[] groups = remoteUgi.getGroupNames(); + if (groups != null && groups.length > 0) { + for (String group : groups) { + if (superGroups.contains(group)) { + return true; + } + } + } + return false; + } catch (IOException e) { + if (LOG.isTraceEnabled()) { + LOG.trace("Marking normal priority after getting exception=" + e); + } + return false; + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 1508a15..42ea559 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -169,6 +169,7 @@ import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.net.DNS; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.zookeeper.KeeperException; import com.google.protobuf.ByteString; @@ -953,8 +954,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } @Override - public int getPriority(RequestHeader header, Message param) { - return priority.getPriority(header, param); + public int getPriority(RequestHeader header, Message param, UserGroupInformation remoteUgi) { + return priority.getPriority(header, param, remoteUgi); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 11ac43f..3015784 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -125,7 +126,8 @@ public class TestSimpleRpcScheduler { scheduler.init(CONTEXT); scheduler.start(); for (CallRunner task : tasks) { - when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(), (Message) anyObject())) + when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(), + (Message) anyObject(), (UserGroupInformation) anyObject())) .thenReturn(qos.get(task)); scheduler.dispatch(task); } @@ -157,7 +159,8 @@ public class TestSimpleRpcScheduler { schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType); PriorityFunction priority = mock(PriorityFunction.class); - when(priority.getPriority(any(RequestHeader.class), any(Message.class))) + when(priority.getPriority(any(RequestHeader.class), + any(Message.class), any(UserGroupInformation.class))) .thenReturn(HConstants.NORMAL_QOS); RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, @@ -235,8 +238,8 @@ public class TestSimpleRpcScheduler { schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); PriorityFunction priority = mock(PriorityFunction.class); - when(priority.getPriority(any(RequestHeader.class), any(Message.class))) - .thenReturn(HConstants.NORMAL_QOS); + when(priority.getPriority(any(RequestHeader.class), any(Message.class), + any(UserGroupInformation.class))).thenReturn(HConstants.NORMAL_QOS); RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, HConstants.QOS_THRESHOLD); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java index dc18408..d4f115e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -95,7 +96,7 @@ public class TestPriorityRpc { Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(true); // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest)); + assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest, null)); } @Test @@ -108,7 +109,20 @@ public class TestPriorityRpc { headerBuilder.setMethodName("foo"); RequestHeader header = headerBuilder.build(); PriorityFunction qosFunc = regionServer.rpcServices.getPriority(); - assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null)); + assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null, null)); + } + + @Test + public void testQosFunctionForRequestCalledBySuperUser() throws IOException { + RequestHeader.Builder headerBuilder = RequestHeader.newBuilder(); + headerBuilder.setMethodName("foo"); + RequestHeader header = headerBuilder.build(); + PriorityFunction qosFunc = regionServer.rpcServices.getPriority(); + + UserGroupInformation remoteUgi = Mockito.mock(UserGroupInformation.class); + Mockito.when(remoteUgi.getShortUserName()).thenReturn("samplesuperuser"); + regionServer.getConfiguration().set("hbase.superuser", "samplesuperuser"); + assertEquals(HConstants.ADMIN_QOS, qosFunc.getPriority(header, null, remoteUgi)); } @Test @@ -130,7 +144,7 @@ public class TestPriorityRpc { Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(false); // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - int qos = priority.getPriority(header, scanRequest); + int qos = priority.getPriority(header, scanRequest, null); assertTrue ("" + qos, qos == HConstants.NORMAL_QOS); //build a scan request with scannerID @@ -148,10 +162,10 @@ public class TestPriorityRpc { // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, scanRequest)); + assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, scanRequest, null)); //the same as above but with non-meta region Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(false); - assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest)); + assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest, null)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java index 2b2ecda..b3f6163 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java @@ -64,6 +64,6 @@ public class TestQosFunction { final AnnotationReadingPriorityFunction qosf, final Message param) { RequestHeader.Builder builder = RequestHeader.newBuilder(); builder.setMethodName(methodName); - assertEquals(methodName, expected, qosf.getPriority(builder.build(), param)); + assertEquals(methodName, expected, qosf.getPriority(builder.build(), param, null)); } } \ No newline at end of file -- 1.9.5 (Apple Git-50.3)