diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 10b364a..305e9dc 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2455,6 +2455,17 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " PERFORMANCE: Execution + Performance logs \n" + " VERBOSE: All logs" ), + // HS2 connections guard rails + HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER("hive.server2.limit.connections.per.user", 0, + "Maximum hive server2 connections per user. Any user exceeding this limit will not be allowed to connect. " + + "Default=0 does not enforce limits."), + HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS("hive.server2.limit.connections.per.ipaddress", 0, + "Maximum hive server2 connections per ipaddress. Any ipaddress exceeding this limit will not be allowed " + + "to connect. Default=0 does not enforce limits."), + HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS("hive.server2.limit.connections.per.user.ipaddress", 0, + "Maximum hive server2 connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will " + + "not be allowed to connect. Default=0 does not enforce limits."), + // Enable metric collection for HiveServer2 HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."), diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java index ebcf4a8..1ee3a50 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -50,7 +50,7 @@ public void testLeakOperationHandle() throws HiveSQLException { HiveConf serverhiveConf = new HiveConf(); String ipAddress = null; HiveSessionImpl session = new HiveSessionImpl(null, protocol, username, password, - serverhiveConf, ipAddress) { + serverhiveConf, ipAddress, null) { @Override protected synchronized void acquire(boolean userAccess, boolean isOperation) { } diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java index 9436a25..ac105bf 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionBase.java @@ -19,6 +19,7 @@ package org.apache.hive.service.cli.session; import java.io.File; +import java.util.List; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.session.SessionState; @@ -84,6 +85,10 @@ void setIpAddress(String ipAddress); + List getForwardedAddresses(); + + void setForwardedAddresses(List forwardedAddresses); + long getLastAccessTime(); long getCreationTime(); diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 0206fe3..7fbcd13 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -102,6 +102,7 @@ // 2) Some parts of session state, like mrStats and vars, need proper synchronization. private SessionState sessionState; private String ipAddress; + private List forwardedAddresses; private static final String FETCH_WORK_SERDE_CLASS = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; @@ -122,13 +123,15 @@ public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol, - String username, String password, HiveConf serverConf, String ipAddress) { + String username, String password, HiveConf serverConf, String ipAddress, + final List forwardedAddresses) { this.username = username; this.password = password; creationTime = System.currentTimeMillis(); this.sessionHandle = sessionHandle != null ? sessionHandle : new SessionHandle(protocol); this.sessionConf = new HiveConf(serverConf); this.ipAddress = ipAddress; + this.forwardedAddresses = forwardedAddresses; this.operationLock = serverConf.getBoolVar( ConfVars.HIVE_SERVER2_PARALLEL_OPS_IN_SESSION) ? null : new Semaphore(1); try { @@ -927,6 +930,16 @@ public void setIpAddress(String ipAddress) { } @Override + public List getForwardedAddresses() { + return forwardedAddresses; + } + + @Override + public void setForwardedAddresses(final List forwardedAddresses) { + this.forwardedAddresses = forwardedAddresses; + } + + @Override public String getDelegationToken(HiveAuthFactory authFactory, String owner, String renewer) throws HiveSQLException { HiveAuthFactory.verifyProxyAccess(getUserName(), owner, getIpAddress(), getHiveConf()); diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java index 8975aee..32598d3 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java @@ -19,6 +19,7 @@ package org.apache.hive.service.cli.session; import java.io.IOException; +import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; @@ -49,8 +50,9 @@ private HiveSession proxySession = null; public HiveSessionImplwithUGI(SessionHandle sessionHandle, TProtocolVersion protocol, String username, - String password, HiveConf hiveConf, String ipAddress, String delegationToken) throws HiveSQLException { - super(sessionHandle, protocol, username, password, hiveConf, ipAddress); + String password, HiveConf hiveConf, String ipAddress, String delegationToken, + final List forwardedAddresses) throws HiveSQLException { + super(sessionHandle, protocol, username, password, hiveConf, ipAddress, forwardedAddresses); setSessionUGI(username); setDelegationToken(delegationToken); } diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index 9b2ae57..1846c91 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -33,6 +33,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; @@ -43,7 +44,6 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.hooks.HookUtils; import org.apache.hadoop.hive.ql.hooks.HooksLoader; import org.apache.hive.service.CompositeService; import org.apache.hive.service.cli.HiveSQLException; @@ -68,6 +68,10 @@ private HiveConf hiveConf; private final Map handleToSession = new ConcurrentHashMap(); + private final Map connectionsCount = new ConcurrentHashMap<>(); + private int userLimit; + private int ipAddressLimit; + private int userIpAddressLimit; private final OperationManager operationManager = new OperationManager(); private ThreadPoolExecutor backgroundOperationPool; private boolean isOperationLogEnabled; @@ -103,6 +107,12 @@ public synchronized void init(HiveConf hiveConf) { registerOpenSesssionMetrics(metrics); registerActiveSesssionMetrics(metrics); } + + userLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER); + ipAddressLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS); + userIpAddressLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS); + LOG.info("Connections limit are user: {} ipaddress: {} user-ipaddress: {}", userLimit, ipAddressLimit, + userIpAddressLimit); super.init(hiveConf); } @@ -368,6 +378,10 @@ public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion p String delegationToken) throws HiveSQLException { + // if client proxies connection, use forwarded ip-addresses instead of just the gateway + final List forwardedAddresses = getForwardedAddresses(); + incrementConnections(username, ipAddress, forwardedAddresses); + HiveSession session; // If doAs is set to true for HiveServer2, we will create a proxy object for the session impl. // Within the proxy object, we wrap the method call in a UserGroupInformation#doAs @@ -375,16 +389,16 @@ public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion p HiveSessionImplwithUGI hiveSessionUgi; if (sessionImplWithUGIclassName == null) { hiveSessionUgi = new HiveSessionImplwithUGI(sessionHandle, protocol, username, password, - hiveConf, ipAddress, delegationToken); + hiveConf, ipAddress, delegationToken, forwardedAddresses); } else { try { Class clazz = Class.forName(sessionImplWithUGIclassName); Constructor constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class, String.class, - String.class, HiveConf.class, String.class, String.class); + String.class, HiveConf.class, String.class, String.class, List.class); hiveSessionUgi = (HiveSessionImplwithUGI) constructor.newInstance(sessionHandle, - protocol, username, password, hiveConf, ipAddress, delegationToken); + protocol, username, password, hiveConf, ipAddress, delegationToken, forwardedAddresses); } catch (Exception e) { - throw new HiveSQLException("Cannot initilize session class:" + sessionImplWithUGIclassName); + throw new HiveSQLException("Cannot initialize session class:" + sessionImplWithUGIclassName); } } session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi()); @@ -392,14 +406,14 @@ public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion p } else { if (sessionImplclassName == null) { session = new HiveSessionImpl(sessionHandle, protocol, username, password, hiveConf, - ipAddress); + ipAddress, forwardedAddresses); } else { try { Class clazz = Class.forName(sessionImplclassName); Constructor constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class, - String.class, String.class, HiveConf.class, String.class); + String.class, String.class, HiveConf.class, String.class, List.class); session = (HiveSession) constructor.newInstance(sessionHandle, protocol, username, password, - hiveConf, ipAddress); + hiveConf, ipAddress, forwardedAddresses); } catch (Exception e) { throw new HiveSQLException("Cannot initilize session class:" + sessionImplclassName, e); } @@ -439,6 +453,95 @@ public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion p return session; } + private void incrementConnections(final String username, final String ipAddress, + final List forwardedAddresses) throws HiveSQLException { + final String clientIpAddress = getOriginClientIpAddress(ipAddress, forwardedAddresses); + + String violation = anyViolations(username, clientIpAddress); + // increment the counters only when there are no violations + if (violation == null) { + if (trackConnectionsPerUser(username)) { + connectionsCount.computeIfAbsent(username, k -> new LongAdder()).increment(); + } + + if (trackConnectionsPerIpAddress(clientIpAddress)) { + connectionsCount.computeIfAbsent(clientIpAddress, k -> new LongAdder()).increment(); + } + + if (trackConnectionsPerUserIpAddress(username, clientIpAddress)) { + connectionsCount.computeIfAbsent(username + ":" + clientIpAddress, k -> new LongAdder()).increment(); + } + } else { + LOG.error(violation); + throw new HiveSQLException(violation); + } + } + + private String getOriginClientIpAddress(final String ipAddress, final List forwardedAddresses) { + if (forwardedAddresses == null || forwardedAddresses.isEmpty()) { + return ipAddress; + } + // order of forwarded ips per X-Forwarded-For http spec (client, proxy1, proxy2) + return forwardedAddresses.get(0); + } + + private void decrementConnections(final HiveSession session) { + final String username = session.getUserName(); + final String clientIpAddress = getOriginClientIpAddress(session.getIpAddress(), session.getForwardedAddresses()); + if (trackConnectionsPerUser(username)) { + connectionsCount.computeIfPresent(username, (k, v) -> v).decrement(); + } + + if (trackConnectionsPerIpAddress(clientIpAddress)) { + connectionsCount.computeIfPresent(clientIpAddress, (k, v) -> v).decrement(); + } + + if (trackConnectionsPerUserIpAddress(username, clientIpAddress)) { + connectionsCount.computeIfPresent(username + ":" + clientIpAddress, (k, v) -> v).decrement(); + } + } + + private String anyViolations(final String username, final String ipAddress) { + if (trackConnectionsPerUser(username) && !withinLimits(username, userLimit)) { + return "Connection limit per user reached (user: " + username + " limit: " + userLimit + ")"; + } + + if (trackConnectionsPerIpAddress(ipAddress) && !withinLimits(ipAddress, ipAddressLimit)) { + return "Connection limit per ipaddress reached (ipaddress: " + ipAddress + " limit: " + ipAddressLimit + ")"; + } + + if (trackConnectionsPerUserIpAddress(username, ipAddress) && + !withinLimits(username + ":" + ipAddress, userIpAddressLimit)) { + return "Connection limit per user:ipaddress reached (user:ipaddress: " + username + ":" + ipAddress + " limit: " + + userIpAddressLimit + ")"; + } + + return null; + } + + private boolean trackConnectionsPerUserIpAddress(final String username, final String ipAddress) { + return userIpAddressLimit > 0 && username != null && !username.isEmpty() && ipAddress != null && + !ipAddress.isEmpty(); + } + + private boolean trackConnectionsPerIpAddress(final String ipAddress) { + return ipAddressLimit > 0 && ipAddress != null && !ipAddress.isEmpty(); + } + + private boolean trackConnectionsPerUser(final String username) { + return userLimit > 0 && username != null && !username.isEmpty(); + } + + private boolean withinLimits(final String track, final int limit) { + if (connectionsCount.containsKey(track)) { + final int connectionCount = connectionsCount.get(track).intValue(); + if (connectionCount >= limit) { + return false; + } + } + return true; + } + public synchronized void closeSession(SessionHandle sessionHandle) throws HiveSQLException { HiveSession session = handleToSession.remove(sessionHandle); if (session == null) { @@ -448,6 +551,7 @@ public synchronized void closeSession(SessionHandle sessionHandle) throws HiveSQ try { session.close(); } finally { + decrementConnections(session); // Shutdown HiveServer2 if it has been deregistered from ZooKeeper and has no active sessions if (!(hiveServer2 == null) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) && (hiveServer2.isDeregisteredWithZooKeeper())) { diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 6354c8c..fc9e6b2 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -320,8 +320,6 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { LOG.info("Client protocol version: " + req.getClient_protocol()); TOpenSessionResp resp = new TOpenSessionResp(); try { - Map openConf = req.getConfiguration(); - SessionHandle sessionHandle = getSessionHandle(req, resp); resp.setSessionHandle(sessionHandle.toTSessionHandle()); Map configurationMap = new HashMap(); diff --git a/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java b/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java new file mode 100644 index 0000000..5ecea9a --- /dev/null +++ b/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.cli; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hive.service.cli.session.SessionManager; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import com.google.common.collect.Lists; + +public class TestCLIServiceConnectionLimits { + @org.junit.Rule + public ExpectedException thrown = ExpectedException.none(); + + private int limit = 10; + private HiveConf conf = new HiveConf(); + + @Test + public void testNoLimit() throws HiveSQLException { + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testIncrementAndDecrementConnectionsUser() throws HiveSQLException { + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + // open 5 connections + for (int i = 0; i < limit / 2; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + // close them all + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + sessionHandles.clear(); + + // open till limit but not exceed + for (int i = 0; i < limit; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "ff", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testInvalidUserName() throws HiveSQLException { + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, null, "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testInvalidIpaddress() throws HiveSQLException { + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", null, null); + sessionHandles.add(session); + } + + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "", null); + sessionHandles.add(session); + } + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testInvalidUserIpaddress() throws HiveSQLException { + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, " ", "bar", null, null); + sessionHandles.add(session); + } + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testConnectionLimitPerUser() throws HiveSQLException { + thrown.expect(HiveSQLException.class); + thrown.expectMessage("Connection limit per user reached (user: foo limit: 10)"); + + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testConnectionLimitPerIpAddress() throws HiveSQLException { + thrown.expect(HiveSQLException.class); + thrown.expectMessage("Connection limit per ipaddress reached (ipaddress: 127.0.0.1 limit: 10)"); + + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testConnectionLimitPerUserIpAddress() throws HiveSQLException { + thrown.expect(HiveSQLException.class); + thrown.expectMessage("Connection limit per user:ipaddress reached (user:ipaddress: foo:127.0.0.1 limit: 10)"); + + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testConnectionMultipleLimitsUserAndIP() throws HiveSQLException { + thrown.expect(HiveSQLException.class); + thrown.expectMessage("Connection limit per user reached (user: foo limit: 5)"); + + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 5); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testConnectionMultipleLimitsIPAndUserIP() throws HiveSQLException { + thrown.expect(HiveSQLException.class); + thrown.expectMessage("Connection limit per ipaddress reached (ipaddress: 127.0.0.1 limit: 5)"); + + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 5); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testConnectionMultipleLimitsUserIPAndUser() throws HiveSQLException { + thrown.expect(HiveSQLException.class); + thrown.expectMessage("Connection limit per user:ipaddress reached (user:ipaddress: foo:127.0.0.1 limit: 10)"); + + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 15); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testConnectionForwardedIpAddresses() throws HiveSQLException { + thrown.expect(HiveSQLException.class); + thrown.expectMessage("Connection limit per ipaddress reached (ipaddress: 194.167.0.3 limit: 10)"); + + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10); + CLIService service = getService(conf); + SessionManager.setForwardedAddresses(Lists.newArrayList("194.167.0.3", "194.167.0.2", "194.167.0.1")); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "194.167.0.1", null); + sessionHandles.add(session); + } + + } finally { + SessionManager.setForwardedAddresses(Collections.emptyList()); + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + private CLIService getService(HiveConf conf) { + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + CLIService service = new CLIService(null); + service.init(conf); + service.start(); + return service; + } +} diff --git a/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java b/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java index 47f95c5..90237c0 100644 --- a/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java +++ b/service/src/test/org/apache/hive/service/cli/session/TestPluggableHiveSessionImpl.java @@ -19,6 +19,8 @@ import static org.junit.Assert.assertEquals; +import java.util.List; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.cli.HiveSQLException; @@ -87,8 +89,8 @@ public void testSessionImplWithUGI() throws Exception { public static final int MAGIC_RETURN_VALUE = 0xbeef0001; public SampleHiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol, - String username, String password, HiveConf serverhiveConf, String ipAddress) { - super(sessionHandle, protocol, username, password, serverhiveConf, ipAddress); + String username, String password, HiveConf serverhiveConf, String ipAddress, List forwardAddresses) { + super(sessionHandle, protocol, username, password, serverhiveConf, ipAddress, forwardAddresses); } @Override @@ -103,9 +105,9 @@ public long getNoOperationTime() { public SampleHiveSessionImplWithUGI(SessionHandle sessionHandle, TProtocolVersion protocol, String username, String password, HiveConf serverhiveConf, String ipAddress, - String delegationToken) throws HiveSQLException { + String delegationToken, List forwardedAddresses) throws HiveSQLException { super(sessionHandle, protocol, username, password, serverhiveConf, ipAddress, - delegationToken); + delegationToken, forwardedAddresses); } @Override