From 2d453d34324a6b8cf17fa04eacf9ed030114ffe6 Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Wed, 25 Feb 2015 19:06:19 +0530 Subject: [PATCH] HBASE-13098 HBase Connection Control --- .../apache/hadoop/hbase/ipc/ConnectionControl.java | 295 +++++++++++++++++++++ .../org/apache/hadoop/hbase/ipc/RpcServer.java | 76 +++++- .../hadoop/hbase/ipc/TestConnectionControl.java | 288 ++++++++++++++++++++ 3 files changed, 656 insertions(+), 3 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionControl.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestConnectionControl.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionControl.java new file mode 100644 index 0000000..e855b52 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ConnectionControl.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +package org.apache.hadoop.hbase.ipc; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.security.AccessDeniedException; +import org.apache.hadoop.hbase.security.SecurityUtil; + +/** + * Limit the accepted connections by three checks: + *
    + *
  1. Global connection counter does not exceed the global limit.
  2. + *
  3. Counter for new coming connection to not exceed the time-window limit.
  4. + *
  5. Counter for the connections per user to not exceed the per-user limit.
  6. + *
+ * If and only if all these checks are passed, request would be accepted. Record the maximum number + * of these three counters for statistics purpose. + */ +@InterfaceAudience.Private +public class ConnectionControl { + /** Maximum connections limited per server */ + static final String CONNECTION_CONTROL_MAX_CONNECTIONS_KEY = + "hbase.connectioncontrol.maxconnections"; + /** Maximum connections limited per user */ + static final String CONNECTION_CONTROL_MAX_CONNECTIONS_PER_USER_KEY = + "hbase.connectioncontrol.maxconnectionsperuser"; + /** Connections control time period in seconds */ + static final String CONNECTION_CONTROL_LIMIT_PERIOD_KEY = "hbase.connectioncontrol.limitperiod"; + /** Maximum connections allowed in a period */ + static final String CONNECTION_CONTROL_MAX_CONNECTIONS_IN_PERIOD_KEY = + "hbase.connectioncontrol.maxconnectionsinperiod"; + /** Master kdc principal */ + private static final String MASTER_PRINCIPAL = "hbase.master.kerberos.principal"; + /** RegionServer kdc principal */ + private static final String REGIONSERVER_PRINCIPAL = "hbase.regionserver.kerberos.principal"; + + private static final int CONNECTION_UNLIMITED = Integer.MAX_VALUE; + private static final int DEFAULT_LIMIT_PERIOD = 10; + + private final Log LOG; + + private static String MASTER_USER_NAME; + private static String REGIONSERVER_USER_NAME; + + private static int CONNECTION_CONTROL_MAX_CONNECTIONS; + private static int CONNECTION_CONTROL_MAX_CONNECTIONS_PER_USER; + private static int CONNECTION_CONTROL_MAX_CONNECTIONS_IN_PERIOD; + private static int CONNECTION_CONTROL_LIMIT_PERIOD; + + private volatile long totalConnectionsNum = 0; + private Map userConnectionsNum = new HashMap(16); + private volatile long periodConnectionsNum = 0; + + private DelayQueue connectionQueue = new DelayQueue(); + private Thread daemonThread; + private volatile boolean running = true; + + // Used by DelayQueue + private static class DelayItem implements Delayed { + /** Base of nanosecond timings, to avoid wrapping */ + private static final long NANO_ORIGIN = System.nanoTime(); + + /** + * Returns nanosecond time offset by origin + */ + final static long now() { + return System.nanoTime() - NANO_ORIGIN; + } + + /** + * Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied + * entries. + */ + private static final AtomicLong sequencer = new AtomicLong(0); + + /** Sequence number to break ties FIFO */ + private long sequenceNumber; + + /** The time the task is enabled to execute in nanoTime units */ + private final long time; + + DelayItem(long timeout) { + this.time = now() + timeout; + this.sequenceNumber = sequencer.getAndIncrement(); + } + + @Override + public long getDelay(TimeUnit unit) { + long d = unit.convert(time - now(), TimeUnit.NANOSECONDS); + return d; + } + + @Override + public int compareTo(Delayed other) { + if (other == this) {// compare zero ONLY if same object + return 0; + } + if (other instanceof DelayItem) { + DelayItem x = (DelayItem) other; + long diff = time - x.time; + if (diff < 0) { + return -1; + } else if (diff > 0) { + return 1; + } else if (sequenceNumber < x.sequenceNumber) { + return -1; + } else { + return 1; + } + } + long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); + return (d == 0) ? 0 : ((d < 0) ? -1 : 1); + } + + } + + public ConnectionControl(Configuration conf, Log LOG) { + this.LOG = LOG; + + MASTER_USER_NAME = + (conf.get(MASTER_PRINCIPAL) != null ? SecurityUtil.getUserFromPrincipal(conf + .get(MASTER_PRINCIPAL)) : null); + if (null == MASTER_USER_NAME) { + LOG.error("Configuration parameter (" + MASTER_PRINCIPAL + ") is not set."); + } + + REGIONSERVER_USER_NAME = + (conf.get(REGIONSERVER_PRINCIPAL) != null ? SecurityUtil.getUserFromPrincipal(conf + .get(REGIONSERVER_PRINCIPAL)) : null); + if (null == REGIONSERVER_USER_NAME) { + LOG.error("Configuration parameter (" + REGIONSERVER_PRINCIPAL + ") is not set."); + } + + int tempNumber = 0; + + tempNumber = conf.getInt(CONNECTION_CONTROL_MAX_CONNECTIONS_KEY, CONNECTION_UNLIMITED); + CONNECTION_CONTROL_MAX_CONNECTIONS = (tempNumber < 0 ? CONNECTION_UNLIMITED : tempNumber); + + tempNumber = conf.getInt(CONNECTION_CONTROL_MAX_CONNECTIONS_PER_USER_KEY, CONNECTION_UNLIMITED); + CONNECTION_CONTROL_MAX_CONNECTIONS_PER_USER = + (tempNumber < 0 ? CONNECTION_UNLIMITED : tempNumber); + + tempNumber = + conf.getInt(CONNECTION_CONTROL_MAX_CONNECTIONS_IN_PERIOD_KEY, CONNECTION_UNLIMITED); + CONNECTION_CONTROL_MAX_CONNECTIONS_IN_PERIOD = + (tempNumber < 0 ? CONNECTION_UNLIMITED : tempNumber); + + tempNumber = conf.getInt(CONNECTION_CONTROL_LIMIT_PERIOD_KEY, DEFAULT_LIMIT_PERIOD); + CONNECTION_CONTROL_LIMIT_PERIOD = (tempNumber <= 0 ? 0 : tempNumber); + + if (LOG.isDebugEnabled()) { + LOG.debug("Configuration properties value set for connection control are:- " + + CONNECTION_CONTROL_MAX_CONNECTIONS_KEY + ": " + CONNECTION_CONTROL_MAX_CONNECTIONS + + ", " + CONNECTION_CONTROL_MAX_CONNECTIONS_PER_USER_KEY + ": " + + CONNECTION_CONTROL_MAX_CONNECTIONS_PER_USER + ", " + + CONNECTION_CONTROL_LIMIT_PERIOD_KEY + ": " + CONNECTION_CONTROL_LIMIT_PERIOD + ", " + + CONNECTION_CONTROL_MAX_CONNECTIONS_IN_PERIOD_KEY + ": " + + CONNECTION_CONTROL_MAX_CONNECTIONS_IN_PERIOD); + } + } + + public void init() { + Runnable daemonTask = new Runnable() { + public void run() { + daemonCheckConnectionQueue(); + } + }; + + daemonThread = new Thread(daemonTask); + daemonThread.setDaemon(true); + daemonThread.setName("Connection_Control_Daemon_Thread"); + daemonThread.start(); + } + + /** + * stop the daemon thread + */ + public void destroy() { + if (daemonThread != null) { + running = false; + daemonThread.interrupt(); + } + } + + /** + * Check the period connection control queue, and take out the over due connections + * @return + */ + private void daemonCheckConnectionQueue() { + try { + while (running) { + connectionQueue.take(); + periodConnectionsNum = connectionQueue.size(); + // Sleep for one second before next check to avoid utilizing 100% of a CPU core + Thread.sleep(1000); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted exception occured while running daemon check for connection queue!"); + LOG.debug("Failure details regarding interrupted exception.", e); + Thread.currentThread().interrupt(); + } + } + + /** + * Check and update the connection number counters. If it satisfies all the 3 conditions of + * connections number control then return normally. Otherwise, throw an exception. + * @param userName name of the user requesting the connection + * @throws AccessDeniedException if connection request should not be granted + * @return + */ + public synchronized void connectionRequest(String userName) throws AccessDeniedException { + // Step1: Update totalConnectionsNum and userConnectionsNum + if (userName != null && !userName.equals(MASTER_USER_NAME) + && !userName.equals(REGIONSERVER_USER_NAME)) { + totalConnectionsNum++; + + Long userConnections = userConnectionsNum.get(userName); + if (null == userConnections) { + userConnections = Long.valueOf(0); + } + userConnectionsNum.put(userName, ++userConnections); + + // Step2: Check number of connections overflows the limited size or not. + if (totalConnectionsNum > CONNECTION_CONTROL_MAX_CONNECTIONS) { + throw new AccessDeniedException("Overflow maximum number of connections limit. " + + "Total Number of connections = " + totalConnectionsNum + + ", maximum number of connections allowed = " + CONNECTION_CONTROL_MAX_CONNECTIONS); + } + + if (userConnections > CONNECTION_CONTROL_MAX_CONNECTIONS_PER_USER) { + throw new AccessDeniedException("Overflow maximum number of connections per user limit. " + + "Total Number of connections for user '" + userName + "' = " + userConnections + + ", maximum number of connections per user allowed = " + + CONNECTION_CONTROL_MAX_CONNECTIONS_PER_USER); + } + + if (CONNECTION_CONTROL_LIMIT_PERIOD > 0) { + if (periodConnectionsNum >= CONNECTION_CONTROL_MAX_CONNECTIONS_IN_PERIOD) { + throw new AccessDeniedException( + "Overflow maximum number of connections in a time frame limit. " + + "Total Number of connections in the time frame = " + periodConnectionsNum + + ", maximum number of connections in a time frame allowed = " + + CONNECTION_CONTROL_MAX_CONNECTIONS_IN_PERIOD); + } + + long nanoTime = + TimeUnit.NANOSECONDS.convert(CONNECTION_CONTROL_LIMIT_PERIOD, TimeUnit.SECONDS); + connectionQueue.put(new DelayItem(nanoTime)); + periodConnectionsNum = connectionQueue.size(); + } + } + } + + /** + * Check and update the connection number counters when a connection is finished + * @param String userName: name of the user requested the connection + * @return + */ + public synchronized void connectionFinished(String userName) { + if (userName != null && !userName.equals(MASTER_USER_NAME) + && !userName.equals(REGIONSERVER_USER_NAME)) { + if (totalConnectionsNum <= 0) { + LOG.warn("Total connections number is: " + totalConnectionsNum + ", it is a invalid value."); + } else { + --totalConnectionsNum; + } + + Long userConnections = userConnectionsNum.get(userName); + if (null == userConnections || userConnections <= 0) { + LOG.warn("Connections counter for user '" + userName + "' is " + userConnections + + ", it is a invalid value"); + } else { + userConnectionsNum.put(userName, --userConnections); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 064771c..4aa9e82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -67,8 +67,6 @@ import javax.security.sasl.SaslServer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -78,6 +76,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.RegionMovedException; @@ -119,8 +119,8 @@ import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; -import org.codehaus.jackson.map.ObjectMapper; import org.apache.htrace.TraceInfo; +import org.codehaus.jackson.map.ObjectMapper; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.BlockingService; @@ -159,6 +159,11 @@ public class RpcServer implements RpcServerInterface { private final boolean authorize; private boolean isSecurityEnabled; + // Controller for controlling number of connections allowed + private ConnectionControl connectionController = null; + + public static final String CONF_CONNECTIONCONTROL_ENABLE = "hbase.connectioncontrol.enable"; + public static final byte CURRENT_VERSION = 0; /** @@ -1691,6 +1696,58 @@ public class RpcServer implements RpcServerInterface { throw new AccessDeniedException("Connection from " + this + " for service " + connectionHeader.getServiceName() + " is unauthorized for user: " + user); } + if (!connectionNumberCheck()) { + throw new AccessDeniedException("Connection from " + this + " for service " + + connectionHeader.getServiceName() + " is rejected for user: " + user); + } + } + } + + /** + * Check whether connection request should be granted or not. + * @return boolean true if connection request is granted else false + * @throws IOException if some I/O error occurs + */ + private boolean connectionNumberCheck() throws IOException { + if (null != connectionController) { + String userName = null; + try { + if (user != null) { + userName = user.getShortUserName(); + } + connectionController.connectionRequest(userName); + if (LOG.isDebugEnabled()) { + LOG.debug("Accept the connection from user: " + + (userName != null ? userName : "Unknown")); + } + } catch (AccessDeniedException ae) { + if (LOG.isErrorEnabled()) { + LOG.error("Reject the connection from user: " + + (userName != null ? userName : "Unknown") + ", for reason: " + ae.getMessage(), + ae); + } + setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage()); + responder.doRespond(authFailedCall); + return false; + } + } + return true; + } + + /** + * update the connection counters on a connection close + */ + private void updateConnectionCountersOnConnectionClose() { + if (null != connectionController) { + String userName = null; + if (user != null) { + userName = user.getShortUserName(); + } + connectionController.connectionFinished(userName); + if (LOG.isDebugEnabled()) { + LOG.debug("Connection from user: " + (userName != null ? userName : "Unknown") + + " will be closed."); + } } } @@ -1811,6 +1868,9 @@ public class RpcServer implements RpcServerInterface { disposeSasl(); data = null; this.dataLengthBuffer = null; + + updateConnectionCountersOnConnectionClose(); + if (!channel.isOpen()) return; try {socket.shutdownOutput();} catch(Exception ignored) {} // FindBugs DE_MIGHT_IGNORE @@ -1922,6 +1982,13 @@ public class RpcServer implements RpcServerInterface { this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled(); if (isSecurityEnabled) { HBaseSaslRpcServer.init(conf); + if (conf.getBoolean(CONF_CONNECTIONCONTROL_ENABLE, false)) { + this.connectionController = new ConnectionControl(conf, LOG); + this.connectionController.init(); + } else { + LOG.debug("Connection control is disabled. To enable it set " + + CONF_CONNECTIONCONTROL_ENABLE + " to true."); + } } this.scheduler = scheduler; this.scheduler.init(new RpcSchedulerContext(this)); @@ -2137,6 +2204,9 @@ public class RpcServer implements RpcServerInterface { listener.doStop(); responder.interrupt(); scheduler.stop(); + if (connectionController != null) { + connectionController.destroy(); + } notifyAll(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestConnectionControl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestConnectionControl.java new file mode 100644 index 0000000..257e44a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestConnectionControl.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.junit.Assert.fail; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.security.AccessDeniedException; +import org.apache.hadoop.hbase.security.SecurityUtil; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests ConnectionControl mechanism + */ +public class TestConnectionControl { + + private static final Log LOG = LogFactory.getLog(TestConnectionControl.class); + + private Configuration conf; + + private ConnectionControl connectionController = null; + + private static final String MASTER_USER_NAME = "hbase/_HOST@HADOOP.COM"; + private static final String MASTER1_USER_NAME = "hbase/DUMMY-HOST1@HADOOP.COM"; + private static final String REGIONSERVER_USER_NAME = "hbase/_HOST@HADOOP.COM"; + private static final String REGIONSERVER1_USER_NAME = "hbase/DUMMY-HOST1@HADOOP.COM"; + private static final String TEST1_USER_NAME = "test1/DUMMY-HOST@HADOOP.COM"; + private static final String TEST2_USER_NAME = "test2/DUMMY-HOST@HADOOP.COM"; + private static final String TEST3_USER_NAME = "test3/OTHER-HOST@HADOOP.COM"; + + @Before + public void setUp() throws Exception { + Logger.getRootLogger().setLevel(Level.DEBUG); + this.conf = HBaseConfiguration.create(); + // Enable kerberos setup + this.conf.setBoolean("hadoop.security.authorization", true); + this.conf.set("hadoop.security.authentication", "kerberos"); + this.conf.setBoolean("hbase.security.authorization", true); + this.conf.set("hbase.security.authentication", "kerberos"); + this.conf.set("hbase.master.kerberos.principal", MASTER_USER_NAME); + this.conf.set("hbase.regionserver.kerberos.principal", REGIONSERVER_USER_NAME); + + // Enable connection control mechanism + this.conf.setBoolean(RpcServer.CONF_CONNECTIONCONTROL_ENABLE, true); + this.conf.setInt(ConnectionControl.CONNECTION_CONTROL_MAX_CONNECTIONS_KEY, 5); + this.conf.setInt(ConnectionControl.CONNECTION_CONTROL_MAX_CONNECTIONS_PER_USER_KEY, 2); + this.conf.setInt(ConnectionControl.CONNECTION_CONTROL_LIMIT_PERIOD_KEY, 10); + this.conf.setInt(ConnectionControl.CONNECTION_CONTROL_MAX_CONNECTIONS_IN_PERIOD_KEY, 2); + } + + @Test(timeout = 300000) + public void testConnectionControlMechanism() { + // # Test 1. Check if ConnectionControl is enabled with security as kerberos + if (conf.getBoolean(RpcServer.CONF_CONNECTIONCONTROL_ENABLE, false)) { + this.connectionController = new ConnectionControl(conf, LOG); + this.connectionController.init(); + } else { + fail("Connection control should be enabled"); + } + + // # Test 2. Check if null users are assumed to be system calls + String userName = null; + + // Test connectionRequest for 6 users. + // With the assumption that null user is system, below call should be success + for (int i = 0; i < 6; i++) { + try { + // ConnectionControl + this.connectionController.connectionRequest(userName); + } catch (Exception e) { + fail("Null user is assumed system call. ConnectionControl should succeed."); + } + } + + // Test connection termination + for (int i = 0; i < 6; i++) { + try { + this.connectionController.connectionFinished(userName); + } catch (Exception e) { + fail("Null user is assumed system call. ConnectionControl should succeed."); + } + } + // # Test 3. Check if MASTER_USER_NAME users are considered to be system calls + // Test incoming connectionRequest for 6 MASTER_USER_NAME, since maxConnections is configured 5 + for (int i = 0; i < 6; i++) { + try { + // ConnectionControl + this.connectionController.connectionRequest(SecurityUtil + .getUserFromPrincipal(MASTER_USER_NAME)); + } catch (Exception e) { + fail("Master username is assumed system call. There should not be any limit on it."); + } + } + + // # Test 4. Check if REGIONSERVER_USER_NAME users are considered to be system calls + // Test incoming connectionRequest for 6 REGIONSERVER_USER_NAME, since maxConnections is + // configured 5 + for (int i = 0; i < 6; i++) { + try { + this.connectionController.connectionRequest(SecurityUtil + .getUserFromPrincipal(REGIONSERVER_USER_NAME)); + } catch (Exception e) { + fail("RegionServer username is assumed system call. There should not be any limit on it."); + } + } + + try { + this.connectionController.connectionRequest(SecurityUtil + .getUserFromPrincipal(MASTER1_USER_NAME)); + } catch (Exception e) { + fail("Master username is assumed system call. Connections should not be limited"); + } + + try { + this.connectionController.connectionRequest(SecurityUtil + .getUserFromPrincipal(REGIONSERVER1_USER_NAME)); + } catch (Exception e) { + fail("RegionServer username is assumed system call. Connections should not be limited"); + } + + // Test connection termination for Master + for (int i = 0; i < 6; i++) { + try { + this.connectionController.connectionFinished(SecurityUtil + .getUserFromPrincipal(MASTER_USER_NAME)); + } catch (Exception e) { + fail("Master username is assumed system call. ConnectionControl connectionFinished should succeed."); + } + } + // Test connection termination for RegionServer + for (int i = 0; i < 6; i++) { + try { + this.connectionController.connectionFinished(SecurityUtil + .getUserFromPrincipal(REGIONSERVER_USER_NAME)); + } catch (Exception e) { + fail("RegionServer username is assumed system call. ConnectionControl connectionFinished should return true"); + } + } + + try { + this.connectionController.connectionFinished(SecurityUtil + .getUserFromPrincipal(MASTER1_USER_NAME)); + } catch (Exception e) { + fail("Master username is assumed system call. ConnectionControl connectionFinished should return true"); + } + + try { + this.connectionController.connectionFinished(SecurityUtil + .getUserFromPrincipal(REGIONSERVER1_USER_NAME)); + } catch (Exception e) { + fail("RegionServer username is assumed system call. ConnectionControl connectionFinished should return true"); + } + + // # Test 5. Check if TEST1_USER_NAME users are considered for connection limitation + // configurations + // Test incoming connectionRequest for 2 TEST1_USER_NAME, since maxConnectionsPerUser is + // configured 2 + for (int i = 0; i < 2; i++) { + try { + this.connectionController.connectionRequest(SecurityUtil + .getUserFromPrincipal(TEST1_USER_NAME)); + } catch (Exception e) { + fail("maxConnections, maxConnectionsPerUser & maxConnectionsInPeriod limit not exceeded. " + + "ConnectionControl connectionRequest should succeed."); + } + } + + // maxConnectionsPerUser limit exceeded + try { + this.connectionController.connectionRequest(SecurityUtil + .getUserFromPrincipal(TEST1_USER_NAME)); + fail("maxConnectionsPerUser limit exceeded. Connection should be restricted"); + } catch (AccessDeniedException ade) { + LOG.info("Max connections per user limit=" + + this.conf.get(ConnectionControl.CONNECTION_CONTROL_MAX_CONNECTIONS_PER_USER_KEY) + + " has been exceeded!"); + this.connectionController.connectionFinished(SecurityUtil + .getUserFromPrincipal(TEST1_USER_NAME)); + } + + // maxConnectionsInPeriod for time limit exceeded + try { + this.connectionController.connectionRequest(SecurityUtil + .getUserFromPrincipal(TEST2_USER_NAME)); + fail("maxConnectionsInPeriod limit exceeded. Connection should be restricted"); + } catch (AccessDeniedException e) { + LOG.info("Max connections in period limit=" + + this.conf.get(ConnectionControl.CONNECTION_CONTROL_MAX_CONNECTIONS_IN_PERIOD_KEY) + + " has been exceeded!"); + this.connectionController.connectionFinished(SecurityUtil + .getUserFromPrincipal(TEST2_USER_NAME)); + } + + // Sleep for 12 sec because TimeLimit period is 10 sec + try { + Thread.sleep(12000); + } catch (InterruptedException e1) { + LOG.warn("Thread sleep for twelve seconds interrupted."); + } + for (int i = 0; i < 2; i++) { + try { + this.connectionController.connectionRequest(SecurityUtil + .getUserFromPrincipal(TEST2_USER_NAME)); + } catch (Exception e) { + fail("maxConnections, maxConnectionsPerUser & maxConnectionsInPeriod limit not exceeded. " + + "ConnectionControl should return true"); + } + } + + // Sleep for 12 sec because TimeLimit period is 10 seconds + try { + Thread.sleep(12000); + } catch (InterruptedException e1) { + LOG.warn("Thread sleep for twelve seconds interrupted."); + } + for (int i = 0; i < 2; i++) { + try { + this.connectionController.connectionRequest(SecurityUtil + .getUserFromPrincipal(TEST3_USER_NAME)); + if (i > 0) { + fail("maxConnections limit exceeded. Connection should be restricted"); + } + } catch (Exception e) { + if (i <= 0) { + fail("maxConnections, maxConnectionsPerUser & maxConnectionsInPeriod limit not exceeded. " + + "ConnectionControl should return true"); + } else { + LOG.info("Max connections global limit=" + + this.conf.get(ConnectionControl.CONNECTION_CONTROL_MAX_CONNECTIONS_KEY) + + " has been exceeded!"); + } + } + } + + // Test connection termination for TEST1_USER_NAME + for (int i = 0; i < 2; i++) { + try { + this.connectionController.connectionFinished(SecurityUtil + .getUserFromPrincipal(TEST1_USER_NAME)); + } catch (Exception e) { + fail("RegionServer username is assumed system call. ConnectionControl should return true"); + } + } + + // Test connection termination for TEST2_USER_NAME + for (int i = 0; i < 2; i++) { + try { + this.connectionController.connectionFinished(SecurityUtil + .getUserFromPrincipal(TEST2_USER_NAME)); + } catch (Exception e) { + fail("RegionServer username is assumed system call. ConnectionControl should return true"); + } + } + + // Test connection termination for TEST3_USER_NAME + for (int i = 0; i < 2; i++) { + try { + this.connectionController.connectionFinished(SecurityUtil + .getUserFromPrincipal(TEST3_USER_NAME)); + } catch (Exception e) { + fail("RegionServer username is assumed system call. ConnectionControl should return true"); + } + } + + // This log a warn message as there are no more connections left unfinished for TEST2_USER_NAME + try { + this.connectionController.connectionFinished(SecurityUtil + .getUserFromPrincipal(TEST2_USER_NAME)); + } catch (Exception e) { + fail("RegionServer username is assumed system call. ConnectionControl should return true"); + } + connectionController.destroy(); + } +} -- 1.9.2.msysgit.0