diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8d9b5a3194..22ce79aade 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2503,6 +2503,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "This flag is used in HiveServer2 to enable a user to use HiveServer2 without\n" + "turning on Tez for HiveServer2. The user could potentially want to run queries\n" + "over Tez without the pool of sessions."), + HIVE_SERVER2_TEZ_QUEUE_ACCESS_CHECK("hive.server2.tez.queue.access.check", "false", + "Whether to check user access to explicitly specified YARN queues. " + + "yarn.resourcemanager.webapp.address must be configured to use this."), HIVE_SERVER2_TEZ_SESSION_LIFETIME("hive.server2.tez.session.lifetime", "162h", new TimeValidator(TimeUnit.HOURS), "The lifetime of the Tez sessions launched by HS2 when default sessions are enabled.\n" + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index a051f90195..5f81f96bd5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -26,21 +26,23 @@ import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; + import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; import org.apache.hadoop.hive.metastore.api.WMTrigger; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession.Manager; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.wm.ExecutionTrigger; import org.apache.hadoop.hive.ql.wm.SessionTriggerProvider; import org.apache.hadoop.hive.ql.wm.Trigger; import org.apache.hadoop.hive.ql.wm.TriggerActionHandler; import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.google.common.annotations.VisibleForTesting; /** @@ -82,8 +84,9 @@ /** This is used to close non-default sessions, and also all sessions when stopping. */ private final List openSessions = new LinkedList<>(); private SessionTriggerProvider sessionTriggerProvider; - private TriggerActionHandler triggerActionHandler; + private TriggerActionHandler triggerActionHandler; private TriggerValidatorRunnable triggerValidatorRunnable; + private YarnQueueHelper yarnQueueChecker; /** Note: this is not thread-safe. */ public static TezSessionPoolManager getInstance() { @@ -170,6 +173,10 @@ public TezSessionPoolSession create(TezSessionPoolSession oldSession) { throw new RuntimeException("Invalid value '" + queueAllowedStr + "' for " + ConfVars.HIVE_SERVER2_TEZ_SESSION_CUSTOM_QUEUE_ALLOWED.varname); } + if (customQueueAllowed == CustomQueueAllowed.TRUE + && HiveConf.getBoolVar(conf, ConfVars.HIVE_SERVER2_TEZ_QUEUE_ACCESS_CHECK)) { + this.yarnQueueChecker = new YarnQueueHelper(conf); + } restrictedConfig = new RestrictedConfigChecker(conf); // Only creates the expiration tracker if expiration is configured. @@ -219,7 +226,8 @@ private TezSessionState getSession(HiveConf conf, boolean doOpen) throws Excepti boolean hasQueue = (queueName != null) && !queueName.isEmpty(); if (hasQueue) { switch (customQueueAllowed) { - case FALSE: throw new HiveException("Specifying " + TezConfiguration.TEZ_QUEUE_NAME + " is not allowed"); + case FALSE: throw new HiveException("Specifying " + + TezConfiguration.TEZ_QUEUE_NAME + " is not allowed"); case IGNORE: { LOG.warn("User has specified " + queueName + " queue; ignoring the setting"); queueName = null; @@ -228,6 +236,20 @@ private TezSessionState getSession(HiveConf conf, boolean doOpen) throws Excepti } default: // All good. } + + if (yarnQueueChecker != null) { + SessionState ss = SessionState.get(); + String userName = null; + if (ss != null && ss.getUserName() != null) { + userName = ss.getUserName(); + } else { + userName = Utils.getUGI().getShortUserName(); + LOG.info("No session user set; using the UGI user " + userName); + } + if (!yarnQueueChecker.hasQueueAccess(queueName, userName)) { + throw new HiveException(userName + " has no access to " + queueName); + } + } } // Check the restricted configs that the users cannot set. @@ -389,8 +411,9 @@ private static boolean canWorkWithSameSession(TezSessionState session, HiveConf } try { - UserGroupInformation ugi = Utils.getUGI(); - String userName = ugi.getShortUserName(); + // Note: this is not the calling user, but rather the user under which this session will + // actually run (which is a different under doAs=false). This seems to be intended. + String userName = Utils.getUGI().getShortUserName(); // TODO Will these checks work if some other user logs in. Isn't a doAs check required somewhere here as well. // Should a doAs check happen here instead of after the user test. // With HiveServer2 - who is the incoming user in terms of UGI (the hive user itself, or the user who actually submitted the query) diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueHelper.java new file mode 100644 index 0000000000..3abc57e6cb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueHelper.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.ql.exec.tez; + +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticatedURL; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.http.HttpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class YarnQueueHelper { + private final static Logger LOG = LoggerFactory.getLogger(YarnQueueHelper.class); + private static final String PERMISSION_PATH = "/ws/v1/cluster/%s/default/access?user=%s"; + + private final String[] rmNodes; + private int lastKnownGoodUrl; + + public YarnQueueHelper(HiveConf conf) { + rmNodes = conf.getTrimmedStrings("yarn.resourcemanager.webapp.address"); + Preconditions.checkArgument((rmNodes != null && rmNodes.length > 0), + "yarn.resourcemanager.webapp.address must be set to enable queue access checks"); + lastKnownGoodUrl = 0; + } + + public boolean hasQueueAccess(String queue, String userName) throws IOException { + String urlSuffix = String.format(PERMISSION_PATH, queue, userName); + // TODO: if we ever use this endpoint for anything else, refactor cycling into a separate class. + int urlIx = lastKnownGoodUrl, lastUrlIx = ((urlIx == 0) ? rmNodes.length : urlIx) - 1; + Exception firstError = null; + while (true) { + String node = rmNodes[urlIx]; + try { + boolean result = hasQueueAccessFromSingleRm("http://" + node + urlSuffix); + lastKnownGoodUrl = urlIx; + return result; + } catch (Exception ex) { + LOG.warn("Cannot check queue access against RM " + node, ex); + if (firstError == null) { + firstError = ex; + } + } + if (urlIx == lastUrlIx) { + throw new IOException("Cannot access any RM service; first error", firstError); + } + urlIx = (urlIx + 1) % rmNodes.length; + } + } + + private boolean hasQueueAccessFromSingleRm(String urlString) throws IOException { + URL url = new URL(urlString); + HttpURLConnection connection = UserGroupInformation.isSecurityEnabled() ? + getSecureConnection(url) : (HttpURLConnection)url.openConnection(); + int statusCode = connection.getResponseCode(); + // The API has a somewhat weird contract. Forbidden means the user's access to queue is + // forbidden, not that our access to the URL is forbidden. Or so we hope, anyway - no way to + // tell these two cases apart. + switch (statusCode) { + case HttpStatus.SC_OK: return true; + case HttpStatus.SC_FORBIDDEN: return false; + default: throw new IOException(handleUnexpectedStatusCode(connection, statusCode)); + } + } + + /** Gets the Hadoop kerberos secure connection (not an SSL connection). */ + private HttpURLConnection getSecureConnection(URL url) throws IOException { + AuthenticatedURL.Token token = new AuthenticatedURL.Token(); + try { + return new AuthenticatedURL().openConnection(url, token); + } catch (AuthenticationException e) { + throw new IOException(e); + } + } + + public String handleUnexpectedStatusCode( + HttpURLConnection connection, int statusCode) throws IOException { + // We do no handle anything but OK for now. Again, we need a real client for this API. + // TODO: handle 401 and return a new connection? nothing for now + InputStream errorStream = connection.getErrorStream(); + String error = "Received " + statusCode; + if (errorStream != null) { + error += ": " + IOUtils.toString(errorStream); + } else { + errorStream = connection.getInputStream(); + if (errorStream != null) { + error += ": " + IOUtils.toString(errorStream); + } + } + return error; + } +} + +