diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a3c853a..649f2c3 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2471,6 +2471,17 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal " PERFORMANCE: Execution + Performance logs \n" + " VERBOSE: All logs" ), + // HS2 connections guard rails + HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER("hive.server2.limit.connections.per.user", 0, + "Maximum hive server2 connections per user. Any user exceeding this limit will not be allowed to connect. " + + "Default=0 does not enforce limits."), + HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS("hive.server2.limit.connections.per.ipaddress", 0, + "Maximum hive server2 connections per ipaddress. Any ipaddress exceeding this limit will not be allowed " + + "to connect. Default=0 does not enforce limits."), + HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS("hive.server2.limit.connections.per.user.ipaddress", 0, + "Maximum hive server2 connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will " + + "not be allowed to connect. Default=0 does not enforce limits."), + // Enable metric collection for HiveServer2 HIVE_SERVER2_METRICS_ENABLED("hive.server2.metrics.enabled", false, "Enable metrics on the HiveServer2."), diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index 9b2ae57..e5b2c2d 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -33,6 +33,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; @@ -43,7 +44,6 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsVariable; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.hooks.HookUtils; import org.apache.hadoop.hive.ql.hooks.HooksLoader; import org.apache.hive.service.CompositeService; import org.apache.hive.service.cli.HiveSQLException; @@ -68,6 +68,10 @@ private HiveConf hiveConf; private final Map handleToSession = new ConcurrentHashMap(); + private final Map connectionsCount = new ConcurrentHashMap<>(); + private int userLimit; + private int ipAddressLimit; + private int userIpAddressLimit; private final OperationManager operationManager = new OperationManager(); private ThreadPoolExecutor backgroundOperationPool; private boolean isOperationLogEnabled; @@ -103,6 +107,12 @@ public synchronized void init(HiveConf hiveConf) { registerOpenSesssionMetrics(metrics); registerActiveSesssionMetrics(metrics); } + + userLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER); + ipAddressLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS); + userIpAddressLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS); + LOG.info("Connections limit are user: {} ipaddress: {} user-ipaddress: {}", userLimit, ipAddressLimit, + userIpAddressLimit); super.init(hiveConf); } @@ -368,6 +378,8 @@ public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion p String delegationToken) throws HiveSQLException { + incrementConnections(username, ipAddress); + HiveSession session; // If doAs is set to true for HiveServer2, we will create a proxy object for the session impl. // Within the proxy object, we wrap the method call in a UserGroupInformation#doAs @@ -439,6 +451,83 @@ public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion p return session; } + private void incrementConnections(final String username, final String ipAddress) throws HiveSQLException { + String violation = anyViolations(username, ipAddress); + // increment the counters only when there are no violations + if (violation == null) { + if (trackConnectionsPerUser(username)) { + connectionsCount.computeIfAbsent(username, k -> new LongAdder()).increment(); + } + + if (trackConnectionsPerIpAddress(ipAddress)) { + connectionsCount.computeIfAbsent(ipAddress, k -> new LongAdder()).increment(); + } + + if (trackConnectionsPerUserIpAddress(username, ipAddress)) { + connectionsCount.computeIfAbsent(username + ":" + ipAddress, k -> new LongAdder()).increment(); + } + } else { + LOG.error(violation); + throw new HiveSQLException(violation); + } + } + + private String anyViolations(final String username, final String ipAddress) { + if (trackConnectionsPerUser(username) && !withinLimits(username, userLimit)) { + return "Connection limit (per user limit: " + userLimit + ") reached"; + } + + if (trackConnectionsPerIpAddress(ipAddress) && !withinLimits(ipAddress, ipAddressLimit)) { + return "Connection limit (per ipaddress limit: " + ipAddressLimit + ") reached"; + } + + if (trackConnectionsPerUserIpAddress(username, ipAddress) && + !withinLimits(username + ":" + ipAddress, userIpAddressLimit)) { + return "Connection limit (per user:ipaddress limit: " + userIpAddressLimit + ") reached"; + } + + return null; + } + + private boolean trackConnectionsPerUserIpAddress(final String username, final String ipAddress) { + return userIpAddressLimit > 0 && username != null && !username.isEmpty() && ipAddress != null && + !ipAddress.isEmpty(); + } + + private boolean trackConnectionsPerIpAddress(final String ipAddress) { + return ipAddressLimit > 0 && ipAddress != null && !ipAddress.isEmpty(); + } + + private boolean trackConnectionsPerUser(final String username) { + return userLimit > 0 && username != null && !username.isEmpty(); + } + + private boolean withinLimits(final String track, final int limit) { + if (connectionsCount.containsKey(track)) { + final int connectionCount = connectionsCount.get(track).intValue(); + if (connectionCount == limit) { + return false; + } + } + return true; + } + + private void decrementConnections(final HiveSession session) { + final String username = session.getUserName(); + final String ipAddress = session.getIpAddress(); + if (trackConnectionsPerUser(username)) { + connectionsCount.computeIfPresent(username, (k, v) -> v).decrement(); + } + + if (trackConnectionsPerIpAddress(ipAddress)) { + connectionsCount.computeIfPresent(ipAddress, (k, v) -> v).decrement(); + } + + if (trackConnectionsPerUserIpAddress(username, ipAddress)) { + connectionsCount.computeIfPresent(username + ":" + ipAddress, (k, v) -> v).decrement(); + } + } + public synchronized void closeSession(SessionHandle sessionHandle) throws HiveSQLException { HiveSession session = handleToSession.remove(sessionHandle); if (session == null) { @@ -448,6 +537,7 @@ public synchronized void closeSession(SessionHandle sessionHandle) throws HiveSQ try { session.close(); } finally { + decrementConnections(session); // Shutdown HiveServer2 if it has been deregistered from ZooKeeper and has no active sessions if (!(hiveServer2 == null) && (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) && (hiveServer2.isDeregisteredWithZooKeeper())) { diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 6354c8c..fc9e6b2 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -320,8 +320,6 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { LOG.info("Client protocol version: " + req.getClient_protocol()); TOpenSessionResp resp = new TOpenSessionResp(); try { - Map openConf = req.getConfiguration(); - SessionHandle sessionHandle = getSessionHandle(req, resp); resp.setSessionHandle(sessionHandle.toTSessionHandle()); Map configurationMap = new HashMap(); diff --git a/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java b/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java new file mode 100644 index 0000000..21a2247 --- /dev/null +++ b/service/src/test/org/apache/hive/service/cli/TestCLIServiceConnectionLimits.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.cli; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class TestCLIServiceConnectionLimits { + @org.junit.Rule + public ExpectedException thrown = ExpectedException.none(); + + private int limit = 10; + private HiveConf conf = new HiveConf(); + + @Test + public void testNoLimit() throws HiveSQLException { + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testIncrementAndDecrementConnectionsUser() throws HiveSQLException { + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + // open 5 connections + for (int i = 0; i < limit / 2; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + // close them all + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + sessionHandles.clear(); + + // open till limit but not exceed + for (int i = 0; i < limit; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "ff", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testInvalidUserName() throws HiveSQLException { + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, null, "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testInvalidIpaddress() throws HiveSQLException { + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", null, null); + sessionHandles.add(session); + } + + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "", null); + sessionHandles.add(session); + } + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testInvalidUserIpaddress() throws HiveSQLException { + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, " ", "bar", null, null); + sessionHandles.add(session); + } + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testConnectionLimitPerUser() throws HiveSQLException { + thrown.expect(HiveSQLException.class); + thrown.expectMessage("Connection limit (per user limit: 10) reached"); + + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testConnectionLimitPerIpAddress() throws HiveSQLException { + thrown.expect(HiveSQLException.class); + thrown.expectMessage("Connection limit (per ipaddress limit: 10) reached"); + + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testConnectionLimitPerUserIpAddress() throws HiveSQLException { + thrown.expect(HiveSQLException.class); + thrown.expectMessage("Connection limit (per user:ipaddress limit: 10) reached"); + + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testConnectionMultipleLimitsUserAndIP() throws HiveSQLException { + thrown.expect(HiveSQLException.class); + thrown.expectMessage("Connection limit (per user limit: 5) reached"); + + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 5); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 10); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 0); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testConnectionMultipleLimitsIPAndUserIP() throws HiveSQLException { + String errMsg = "Connection limit (per ipaddress limit: 5) reached"; + thrown.expect(HiveSQLException.class); + thrown.expectMessage(errMsg); + + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 5); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + @Test + public void testConnectionMultipleLimitsUserIPAndUser() throws HiveSQLException { + thrown.expect(HiveSQLException.class); + thrown.expectMessage("Connection limit (per user:ipaddress limit: 10) reached"); + + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER, 15); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_IPADDRESS, 0); + conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER_IPADDRESS, 10); + CLIService service = getService(conf); + List sessionHandles = new ArrayList<>(); + try { + for (int i = 0; i < limit + 1; i++) { + SessionHandle session = service.openSession(CLIService.SERVER_VERSION, "foo", "bar", "127.0.0.1", null); + sessionHandles.add(session); + } + + } finally { + for (SessionHandle sessionHandle : sessionHandles) { + service.closeSession(sessionHandle); + } + service.stop(); + } + } + + private CLIService getService(HiveConf conf) { + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + CLIService service = new CLIService(null); + service.init(conf); + service.start(); + return service; + } +}