diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 02367eb433..7642e474f8 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2512,6 +2512,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "This flag is used in HiveServer2 to enable a user to use HiveServer2 without\n" + "turning on Tez for HiveServer2. The user could potentially want to run queries\n" + "over Tez without the pool of sessions."), + HIVE_SERVER2_TEZ_QUEUE_ACCESS_CHECK("hive.server2.tez.queue.access.check", "false", + "Whether to check user access to explicitly specified YARN queues. " + + "yarn.resourcemanager.webapp.address must be configured to use this."), HIVE_SERVER2_TEZ_SESSION_LIFETIME("hive.server2.tez.session.lifetime", "162h", new TimeValidator(TimeUnit.HOURS), "The lifetime of the Tez sessions launched by HS2 when default sessions are enabled.\n" + diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index ed3984efe8..602439bcaa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2004,6 +2004,7 @@ private void execute() throws CommandProcessorResponse { SessionState ss = SessionState.get(); + // TODO: should this use getUserFromAuthenticator? hookContext = new PrivateHookContext(plan, queryState, ctx.getPathToCS(), SessionState.get().getUserName(), ss.getUserIpAddress(), InetAddress.getLocalHost().getHostAddress(), operationId, ss.getSessionId(), Thread.currentThread().getName(), ss.isHiveServerQuery(), perfLogger, queryInfo, ctx); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java index 1de333e985..a0a90a96f0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java @@ -185,7 +185,7 @@ private int createPermanentFunction(Hive db, CreateFunctionDesc createFunctionDe funcName, dbName, className, - SessionState.get().getUserName(), + SessionState.get().getUserName(), // TODO: should this use getUserFromAuthenticator? PrincipalType.USER, (int) (System.currentTimeMillis() / 1000), org.apache.hadoop.hive.metastore.api.FunctionType.JAVA, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index a051f90195..2633390861 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -26,21 +26,23 @@ import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.Manager; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.google.common.annotations.VisibleForTesting; /** @@ -82,8 +84,9 @@ /** This is used to close non-default sessions, and also all sessions when stopping. */ private final List openSessions = new LinkedList<>(); private SessionTriggerProvider sessionTriggerProvider; - private TriggerActionHandler triggerActionHandler; + private TriggerActionHandler triggerActionHandler; private TriggerValidatorRunnable triggerValidatorRunnable; + private YarnQueueHelper yarnQueueChecker; /** Note: this is not thread-safe. */ public static TezSessionPoolManager getInstance() { @@ -99,6 +102,9 @@ protected TezSessionPoolManager() { } public void startPool(HiveConf conf, final WMFullResourcePlan resourcePlan) throws Exception { + if (restrictedConfig == null) { // Sanity check; restrictedConfig is always set in setup. + throw new AssertionError("setupPool or setupNonPool needs to be called first"); + } if (defaultSessionPool != null) { defaultSessionPool.start(); } @@ -108,7 +114,8 @@ public void startPool(HiveConf conf, final WMFullResourcePlan resourcePlan) thro initTriggers(conf); if (resourcePlan != null) { updateTriggers(resourcePlan); - LOG.info("Updated tez session pool manager with active resource plan: {}", resourcePlan.getPlan().getName()); + LOG.info("Updated tez session pool manager with active resource plan: {}", + resourcePlan.getPlan().getName()); } } @@ -159,10 +166,21 @@ public TezSessionPoolSession create(TezSessionPoolSession oldSession) { }); } + setupNonPool(conf); + + // Only creates the expiration tracker if expiration is configured. + expirationTracker = SessionExpirationTracker.create(conf, this); + + // From this point on, session creation will wait for the default pool (if # of sessions > 0). + this.hasInitialSessions = numSessionsTotal > 0; + } + + public void setupNonPool(HiveConf conf) { + this.initConf = conf; numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); llapQueue = new Semaphore(numConcurrentLlapQueries, true); - String queueAllowedStr = HiveConf.getVar(initConf, + String queueAllowedStr = HiveConf.getVar(conf, ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED); try { this.customQueueAllowed = CustomQueueAllowed.valueOf(queueAllowedStr.toUpperCase()); @@ -170,16 +188,12 @@ public TezSessionPoolSession create(TezSessionPoolSession oldSession) { throw new RuntimeException("Invalid value '" + queueAllowedStr + "' for " + ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED.varname); } + if (customQueueAllowed == CustomQueueAllowed.TRUE + && HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_TEZ_QUEUE_ACCESS_CHECK)) { + this.yarnQueueChecker = new YarnQueueHelper(conf); + } restrictedConfig = new RestrictedConfigChecker(conf); - // Only creates the expiration tracker if expiration is configured. - expirationTracker = SessionExpirationTracker.create(conf, this); - - // From this point on, session creation will wait for the default pool (if # of sessions > 0). - this.hasInitialSessions = numSessionsTotal > 0; - if (!hasInitialSessions) { - return; - } } public void initTriggers(final HiveConf conf) { @@ -219,7 +233,8 @@ private TezSessionState getSession(HiveConf conf, boolean doOpen) throws Excepti boolean hasQueue = (queueName != null) && !queueName.isEmpty(); if (hasQueue) { switch (customQueueAllowed) { - case FALSE: throw new HiveException("Specifying " + TezConfiguration.TEZ_QUEUE_NAME + " is not allowed"); + case FALSE: throw new HiveException("Specifying " + + TezConfiguration.TEZ_QUEUE_NAME + " is not allowed"); case IGNORE: { LOG.warn("User has specified " + queueName + " queue; ignoring the setting"); queueName = null; @@ -228,6 +243,20 @@ private TezSessionState getSession(HiveConf conf, boolean doOpen) throws Excepti } default: // All good. } + + if (yarnQueueChecker != null) { + SessionState ss = SessionState.get(); + String userName = null; + if (ss != null) { + userName = ss.getAuthenticator() != null + ? ss.getAuthenticator().getUserName() : ss.getUserName(); + } + if (userName == null) { + userName = Utils.getUGI().getShortUserName(); + LOG.info("No session user set; using the UGI user " + userName); + } + yarnQueueChecker.checkQueueAccess(queueName, userName); + } } // Check the restricted configs that the users cannot set. @@ -389,8 +418,9 @@ private static boolean canWorkWithSameSession(TezSessionState session, HiveConf } try { - UserGroupInformation ugi = Utils.getUGI(); - String userName = ugi.getShortUserName(); + // Note: this is not the calling user, but rather the user under which this session will + // actually run (which is a different under doAs=false). This seems to be intended. + String userName = Utils.getUGI().getShortUserName(); // TODO Will these checks work if some other user logs in. Isn't a doAs check required somewhere here as well. // Should a doAs check happen here instead of after the user test. // With HiveServer2 - who is the incoming user in terms of UGI (the hive user itself, or the user who actually submitted the query) diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index a5f4cb7539..cbf0b7a6fc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -155,7 +155,8 @@ public int execute(DriverContext driverContext) { // We only need a username for UGI to use for groups; getGroups will fetch the groups // based on Hadoop configuration, as documented at // https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/GroupsMapping.html - String userName = ss.getUserName(); + String userName = ss.getAuthenticator() != null + ? ss.getAuthenticator().getUserName() : ss.getUserName(); List groups = null; if (userName == null) { userName = "anonymous"; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueHelper.java new file mode 100644 index 0000000000..d1962402e5 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueHelper.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticatedURL; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.http.HttpStatus; +import org.json.JSONException; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class YarnQueueHelper { + private final static Logger LOG = LoggerFactory.getLogger(YarnQueueHelper.class); + private static final String PERMISSION_PATH = "/ws/v1/cluster/queues/%s/access?user=%s"; + + private final String[] rmNodes; + private int lastKnownGoodUrl; + + public YarnQueueHelper(HiveConf conf) { + rmNodes = conf.getTrimmedStrings("yarn.resourcemanager.webapp.address"); + Preconditions.checkArgument((rmNodes != null && rmNodes.length > 0), + "yarn.resourcemanager.webapp.address must be set to enable queue access checks"); + lastKnownGoodUrl = 0; + } + + public void checkQueueAccess(String queueName, String userName) throws IOException { + String urlSuffix = String.format(PERMISSION_PATH, queueName, userName); + // TODO: if we ever use this endpoint for anything else, refactor cycling into a separate class. + int urlIx = lastKnownGoodUrl, lastUrlIx = ((urlIx == 0) ? rmNodes.length : urlIx) - 1; + Exception firstError = null; + while (true) { + String node = rmNodes[urlIx]; + try { + String error = checkQueueAccessFromSingleRm("http://" + node + urlSuffix); + lastKnownGoodUrl = urlIx; + if (error == null) return; // null error message here means the user has access. + throw new HiveException(error.isEmpty() + ? (userName + " has no access to " + queueName) : error); + } catch (Exception ex) { + LOG.warn("Cannot check queue access against RM " + node, ex); + if (firstError == null) { + firstError = ex; + } + } + if (urlIx == lastUrlIx) { + throw new IOException("Cannot access any RM service; first error", firstError); + } + urlIx = (urlIx + 1) % rmNodes.length; + } + } + + private String checkQueueAccessFromSingleRm(String urlString) throws IOException { + URL url = new URL(urlString); + HttpURLConnection connection = UserGroupInformation.isSecurityEnabled() ? + getSecureConnection(url) : (HttpURLConnection)url.openConnection(); + int statusCode = connection.getResponseCode(); + // The API has a somewhat weird contract. Forbidden means the user's access to queue is + // forbidden, not that our access to the URL is forbidden. Or so we hope, anyway - no way to + // tell these two cases apart. + switch (statusCode) { + case HttpStatus.SC_OK: return processResponse(connection); + case HttpStatus.SC_FORBIDDEN: { + // Throw a special exception since it's usually a well-known misconfiguration. + throw new IOException(handleUnexpectedStatusCode(connection, statusCode, "check that the " + + "HiveServer2 principal is in the administrator list of the root YARN queue")); + } + default: throw new IOException(handleUnexpectedStatusCode(connection, statusCode, null)); + } + } + + private String processResponse(HttpURLConnection connection) throws IOException { + InputStream stream = connection.getInputStream(); + if (stream == null) { + throw new IOException(handleUnexpectedStatusCode( + connection, HttpStatus.SC_OK, "No input on successful API call")); + } + String jsonStr = IOUtils.toString(stream); + try { + JSONObject obj = new JSONObject(jsonStr); + boolean result = obj.getBoolean("allowed"); + if (result) return null; + String diag = obj.getString("diagnostics"); + return diag == null ? "" : diag; + } catch (JSONException ex) { + LOG.error("Couldn't parse " + jsonStr, ex); + throw ex; + } + + } + + /** Gets the Hadoop kerberos secure connection (not an SSL connection). */ + private HttpURLConnection getSecureConnection(URL url) throws IOException { + AuthenticatedURL.Token token = new AuthenticatedURL.Token(); + try { + return new AuthenticatedURL().openConnection(url, token); + } catch (AuthenticationException e) { + throw new IOException(e); + } + } + + public String handleUnexpectedStatusCode( + HttpURLConnection connection, int statusCode, String errorStr) throws IOException { + // We do no handle anything but OK for now. Again, we need a real client for this API. + // TODO: handle 401 and return a new connection? nothing for now + InputStream errorStream = connection.getErrorStream(); + String error = "Received " + statusCode + (errorStr == null ? "" : (" (" + errorStr + ")")); + if (errorStream != null) { + error += ": " + IOUtils.toString(errorStream); + } else { + errorStream = connection.getInputStream(); + if (errorStream != null) { + error += ": " + IOUtils.toString(errorStream); + } + } + return error; + } +} + + diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index ed1c0abdf2..fcd38ebe0f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -3478,6 +3478,7 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole cmd.append(")"); } SessionState ss = SessionState.get(); + // TODO: should this use getUserFromAuthenticator? String uName = (ss == null? null: ss.getUserName()); Driver driver = new Driver(conf, uName, queryState.getLineageState()); int rc = driver.compile(cmd.toString(), false); diff --git ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLoggedInUser.java ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLoggedInUser.java index 3ed793ec48..3d36e0fe53 100644 --- ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLoggedInUser.java +++ ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFLoggedInUser.java @@ -45,6 +45,7 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen } if (loggedInUser == null) { + // TODO: getUserFromAuthenticator? String loggedInUserName = SessionState.get().getUserName(); if (loggedInUserName != null) { loggedInUser = new Text(loggedInUserName); diff --git service/src/java/org/apache/hive/service/server/HiveServer2.java service/src/java/org/apache/hive/service/server/HiveServer2.java index 6308c5cd4f..47f84b5e73 100644 --- service/src/java/org/apache/hive/service/server/HiveServer2.java +++ service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -665,7 +665,12 @@ public synchronized void start() { if (!activePassiveHA) { LOG.info("HS2 interactive HA not enabled. Starting tez sessions.."); - startOrReconnectTezSessions(); + try { + startOrReconnectTezSessions(); + } catch (Exception e) { + LOG.error("Error starting Tez sessions: ", e); + throw new ServiceException(e); + } } else { LOG.info("HS2 interactive HA enabled. Tez sessions will be started/reconnected by the leader."); } @@ -738,6 +743,8 @@ private void initAndStartTezSessionPoolManager(final WMFullResourcePlan resource tezSessionPoolManager = TezSessionPoolManager.getInstance(); if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { tezSessionPoolManager.setupPool(hiveConf); + } else { + tezSessionPoolManager.setupNonPool(hiveConf); } tezSessionPoolManager.startPool(hiveConf, resourcePlan); LOG.info("Tez session pool manager initialized.");